From 4264a973d1163eeffe92918beb73d98e0a042d36 Mon Sep 17 00:00:00 2001 From: Woosuk Kwon Date: Thu, 2 Jan 2025 09:00:23 -0800 Subject: [PATCH 1/4] [V1] Add BlockTable abstraction Signed-off-by: Woosuk Kwon --- vllm/v1/worker/gpu_block_table.py | 77 ++++++++++++++++++++++++++++++ vllm/v1/worker/gpu_input_batch.py | 24 ++++------ vllm/v1/worker/gpu_model_runner.py | 13 ++--- 3 files changed, 91 insertions(+), 23 deletions(-) create mode 100644 vllm/v1/worker/gpu_block_table.py diff --git a/vllm/v1/worker/gpu_block_table.py b/vllm/v1/worker/gpu_block_table.py new file mode 100644 index 0000000000000..4b302750a8e94 --- /dev/null +++ b/vllm/v1/worker/gpu_block_table.py @@ -0,0 +1,77 @@ +from typing import List + +import numpy as np +import torch + +from vllm.logger import init_logger + +logger = init_logger(__name__) + + +class BlockTable: + + def __init__( + self, + max_num_reqs: int, + max_model_len: int, + max_num_blocks_per_req: int, + pin_memory: bool, + device: torch.device, + ): + self.max_num_reqs = max_num_reqs + self.max_model_len = max_model_len + self.max_num_blocks_per_req = max_num_blocks_per_req + self.pin_memory = pin_memory + self.device = device + + self.block_table = torch.zeros( + (max_num_reqs, max_num_blocks_per_req), + device=self.device, + dtype=torch.int32, + ) + self.block_table_cpu = torch.zeros( + (max_num_reqs, max_num_blocks_per_req), + device="cpu", + dtype=torch.int32, + pin_memory=pin_memory, + ) + self.block_table_np = self.block_table_cpu.numpy() + self.num_blocks_per_row = np.zeros(max_num_reqs, dtype=np.int32) + + def add_row(self, row_idx: int, block_ids: List[int]) -> None: + num_blocks = len(block_ids) + self.block_table_np[row_idx, :num_blocks] = block_ids + self.num_blocks_per_row[row_idx] = num_blocks + + def append_row( + self, + row_idx: int, + start: int, + block_ids: List[int], + ) -> None: + num_blocks = len(block_ids) + self.block_table_np[row_idx, start:start + num_blocks] = block_ids + self.num_blocks_per_row[row_idx] = start + num_blocks + + def move_row(self, src: int, tgt: int) -> None: + num_blocks = self.num_blocks_per_row[src] + self.block_table_np[tgt, :num_blocks] = self.block_table_np[ + src, :num_blocks] + self.num_blocks_per_row[tgt] = num_blocks + + def commit(self, num_reqs: int) -> None: + self.block_table[:num_reqs].copy_(self.block_table_cpu[:num_reqs], + non_blocking=True) + + def clear(self) -> None: + self.block_table.fill_(0) + self.block_table_cpu.fill_(0) + + def cuda(self) -> torch.Tensor: + return self.block_table + + def cpu(self) -> torch.Tensor: + return self.block_table_cpu + + def numpy(self) -> np.ndarray: + return self.block_table_np diff --git a/vllm/v1/worker/gpu_input_batch.py b/vllm/v1/worker/gpu_input_batch.py index e79145300fe06..1cb3560b54651 100644 --- a/vllm/v1/worker/gpu_input_batch.py +++ b/vllm/v1/worker/gpu_input_batch.py @@ -9,6 +9,7 @@ from vllm.multimodal import MultiModalKwargs from vllm.sampling_params import SamplingParams, SamplingType from vllm.v1.sample.metadata import SamplingMetadata +from vllm.v1.worker.gpu_block_table import BlockTable if TYPE_CHECKING: from vllm.multimodal.inputs import PlaceholderRange @@ -69,19 +70,14 @@ def __init__( self.num_computed_tokens_cpu = np.empty(max_num_reqs, dtype=np.int32) self.num_prompt_tokens = np.zeros(max_num_reqs, dtype=np.int32) - # Attention-related. - self.block_table = torch.zeros( - (max_num_reqs, max_num_blocks_per_req), - device=self.device, - dtype=torch.int32, - ) - self.block_table_cpu_tensor = torch.zeros( - (max_num_reqs, max_num_blocks_per_req), - device="cpu", - dtype=torch.int32, + # Block table. + self.block_table = BlockTable( + max_num_reqs=max_num_reqs, + max_model_len=max_model_len, + max_num_blocks_per_req=max_num_blocks_per_req, pin_memory=pin_memory, + device=device, ) - self.block_table_cpu = self.block_table_cpu_tensor.numpy() # Sampling-related. self.temperature = torch.empty((max_num_reqs, ), @@ -191,8 +187,7 @@ def add_request( start_idx:end_idx] = request.output_token_ids self.num_computed_tokens_cpu[req_index] = request.num_computed_tokens - num_blocks = len(request.block_ids) - self.block_table_cpu[req_index, :num_blocks] = request.block_ids + self.block_table.add_row(req_index, request.block_ids) sampling_params = request.sampling_params self.temperature_cpu[req_index] = sampling_params.temperature @@ -298,8 +293,7 @@ def condense(self, empty_req_indices: List[int]) -> None: self.num_prompt_tokens[last_req_index] self.num_computed_tokens_cpu[ empty_index] = self.num_computed_tokens_cpu[last_req_index] - self.block_table_cpu[empty_index] = self.block_table_cpu[ - last_req_index] + self.block_table.move_row(last_req_index, empty_index) self.temperature_cpu[empty_index] = self.temperature_cpu[ last_req_index] self.top_p_cpu[empty_index] = self.top_p_cpu[last_req_index] diff --git a/vllm/v1/worker/gpu_model_runner.py b/vllm/v1/worker/gpu_model_runner.py index 995de54e8e0a0..e77686700053b 100644 --- a/vllm/v1/worker/gpu_model_runner.py +++ b/vllm/v1/worker/gpu_model_runner.py @@ -211,10 +211,9 @@ def _update_states(self, scheduler_output: "SchedulerOutput") -> None: if num_new_blocks == 0: continue start_index = len(req_state.block_ids) - end_index = start_index + num_new_blocks req_state.block_ids.extend(req_data.new_block_ids) - self.input_batch.block_table_cpu[ - req_index, start_index:end_index] = req_data.new_block_ids + self.input_batch.block_table.append_row(req_index, start_index, + req_data.new_block_ids) req_ids_to_add: List[str] = [] # Add new requests to the cached states. @@ -275,9 +274,7 @@ def _prepare_inputs(self, scheduler_output: "SchedulerOutput"): # OPTIMIZATION: Start copying the block table first. # This way, we can overlap the copy with the following CPU operations. - self.input_batch.block_table[:num_reqs].copy_( - self.input_batch.block_table_cpu_tensor[:num_reqs], - non_blocking=True) + self.input_batch.block_table.commit(num_reqs) # Get the number of scheduled tokens for each request. # TODO: The Python loop can be slow. Optimize. @@ -333,7 +330,7 @@ def _prepare_inputs(self, scheduler_output: "SchedulerOutput"): # NOTE(woosuk): We use torch.index_select instead of np.take here # because torch.index_select is much faster than np.take for large # tensors. - block_numbers = (self.input_batch.block_table_cpu_tensor.flatten() + block_numbers = (self.input_batch.block_table.cpu().flatten() [block_table_indices].numpy()) block_offsets = positions_np % self.block_size np.add(block_numbers * self.block_size, @@ -450,7 +447,7 @@ def _prepare_inputs(self, scheduler_output: "SchedulerOutput"): query_start_loc=query_start_loc, max_seq_len=max_seq_len, seq_start_loc=seq_start_loc, - block_table=self.input_batch.block_table[:num_reqs], + block_table=self.input_batch.block_table.cuda()[:num_reqs], slot_mapping=slot_mapping, use_cascade=use_cascade, common_prefix_len=common_prefix_len, From b181413b8456b5deefcca58c92593657937dd30b Mon Sep 17 00:00:00 2001 From: Woosuk Kwon Date: Thu, 2 Jan 2025 09:03:38 -0800 Subject: [PATCH 2/4] Minor Signed-off-by: Woosuk Kwon --- vllm/v1/worker/gpu_block_table.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/vllm/v1/worker/gpu_block_table.py b/vllm/v1/worker/gpu_block_table.py index 4b302750a8e94..15248f0112e71 100644 --- a/vllm/v1/worker/gpu_block_table.py +++ b/vllm/v1/worker/gpu_block_table.py @@ -38,11 +38,6 @@ def __init__( self.block_table_np = self.block_table_cpu.numpy() self.num_blocks_per_row = np.zeros(max_num_reqs, dtype=np.int32) - def add_row(self, row_idx: int, block_ids: List[int]) -> None: - num_blocks = len(block_ids) - self.block_table_np[row_idx, :num_blocks] = block_ids - self.num_blocks_per_row[row_idx] = num_blocks - def append_row( self, row_idx: int, @@ -53,6 +48,9 @@ def append_row( self.block_table_np[row_idx, start:start + num_blocks] = block_ids self.num_blocks_per_row[row_idx] = start + num_blocks + def add_row(self, row_idx: int, block_ids: List[int]) -> None: + self.append_row(row_idx, 0, block_ids) + def move_row(self, src: int, tgt: int) -> None: num_blocks = self.num_blocks_per_row[src] self.block_table_np[tgt, :num_blocks] = self.block_table_np[ From 66b6f813571c59a68d3516072e9fd28723a063b0 Mon Sep 17 00:00:00 2001 From: Woosuk Kwon Date: Thu, 2 Jan 2025 16:29:46 -0800 Subject: [PATCH 3/4] Make BlockTable hardware agnostic Signed-off-by: Woosuk Kwon --- vllm/v1/worker/{gpu_block_table.py => block_table.py} | 5 ++++- vllm/v1/worker/gpu_input_batch.py | 2 +- vllm/v1/worker/gpu_model_runner.py | 2 +- 3 files changed, 6 insertions(+), 3 deletions(-) rename vllm/v1/worker/{gpu_block_table.py => block_table.py} (90%) diff --git a/vllm/v1/worker/gpu_block_table.py b/vllm/v1/worker/block_table.py similarity index 90% rename from vllm/v1/worker/gpu_block_table.py rename to vllm/v1/worker/block_table.py index 15248f0112e71..d303616da34dd 100644 --- a/vllm/v1/worker/gpu_block_table.py +++ b/vllm/v1/worker/block_table.py @@ -65,11 +65,14 @@ def clear(self) -> None: self.block_table.fill_(0) self.block_table_cpu.fill_(0) - def cuda(self) -> torch.Tensor: + def to_device(self) -> torch.Tensor: + """Ruturns the device tensor of the block table.""" return self.block_table def cpu(self) -> torch.Tensor: + """Returns the CPU tensor of the block table.""" return self.block_table_cpu def numpy(self) -> np.ndarray: + """Returns the numpy array of the block table.""" return self.block_table_np diff --git a/vllm/v1/worker/gpu_input_batch.py b/vllm/v1/worker/gpu_input_batch.py index 7fdcee604dcf6..40494e64b22f0 100644 --- a/vllm/v1/worker/gpu_input_batch.py +++ b/vllm/v1/worker/gpu_input_batch.py @@ -9,7 +9,7 @@ from vllm.multimodal import MultiModalKwargs from vllm.sampling_params import SamplingParams, SamplingType from vllm.v1.sample.metadata import SamplingMetadata -from vllm.v1.worker.gpu_block_table import BlockTable +from vllm.v1.worker.block_table import BlockTable if TYPE_CHECKING: from vllm.multimodal.inputs import PlaceholderRange diff --git a/vllm/v1/worker/gpu_model_runner.py b/vllm/v1/worker/gpu_model_runner.py index 6c3dfafd34453..728961cc9e1b3 100644 --- a/vllm/v1/worker/gpu_model_runner.py +++ b/vllm/v1/worker/gpu_model_runner.py @@ -447,7 +447,7 @@ def _prepare_inputs(self, scheduler_output: "SchedulerOutput"): query_start_loc=query_start_loc, max_seq_len=max_seq_len, seq_start_loc=seq_start_loc, - block_table=self.input_batch.block_table.cuda()[:num_reqs], + block_table=self.input_batch.block_table.to_device()[:num_reqs], slot_mapping=slot_mapping, use_cascade=use_cascade, common_prefix_len=common_prefix_len, From 233f8445453a62c1be637eeebec2a8249884d69d Mon Sep 17 00:00:00 2001 From: Woosuk Kwon Date: Sun, 5 Jan 2025 17:39:55 -0800 Subject: [PATCH 4/4] minor Signed-off-by: Woosuk Kwon --- vllm/v1/worker/block_table.py | 6 +++--- vllm/v1/worker/gpu_model_runner.py | 7 ++++--- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/vllm/v1/worker/block_table.py b/vllm/v1/worker/block_table.py index d303616da34dd..26a2084b131fa 100644 --- a/vllm/v1/worker/block_table.py +++ b/vllm/v1/worker/block_table.py @@ -65,14 +65,14 @@ def clear(self) -> None: self.block_table.fill_(0) self.block_table_cpu.fill_(0) - def to_device(self) -> torch.Tensor: + def get_device_tensor(self) -> torch.Tensor: """Ruturns the device tensor of the block table.""" return self.block_table - def cpu(self) -> torch.Tensor: + def get_cpu_tensor(self) -> torch.Tensor: """Returns the CPU tensor of the block table.""" return self.block_table_cpu - def numpy(self) -> np.ndarray: + def get_numpy_array(self) -> np.ndarray: """Returns the numpy array of the block table.""" return self.block_table_np diff --git a/vllm/v1/worker/gpu_model_runner.py b/vllm/v1/worker/gpu_model_runner.py index 596d68c923d34..31e693235f99f 100644 --- a/vllm/v1/worker/gpu_model_runner.py +++ b/vllm/v1/worker/gpu_model_runner.py @@ -330,8 +330,8 @@ def _prepare_inputs(self, scheduler_output: "SchedulerOutput"): # NOTE(woosuk): We use torch.index_select instead of np.take here # because torch.index_select is much faster than np.take for large # tensors. - block_numbers = (self.input_batch.block_table.cpu().flatten() - [block_table_indices].numpy()) + block_table_cpu = self.input_batch.block_table.get_cpu_tensor() + block_numbers = block_table_cpu.flatten()[block_table_indices].numpy() block_offsets = positions_np % self.block_size np.add(block_numbers * self.block_size, block_offsets, @@ -447,7 +447,8 @@ def _prepare_inputs(self, scheduler_output: "SchedulerOutput"): query_start_loc=query_start_loc, max_seq_len=max_seq_len, seq_start_loc=seq_start_loc, - block_table=self.input_batch.block_table.to_device()[:num_reqs], + block_table=( + self.input_batch.block_table.get_device_tensor()[:num_reqs]), slot_mapping=slot_mapping, use_cascade=use_cascade, common_prefix_len=common_prefix_len,