diff --git a/tests/models/registry.py b/tests/models/registry.py index 819ef957a07f3..f5a37420a2909 100644 --- a/tests/models/registry.py +++ b/tests/models/registry.py @@ -61,6 +61,8 @@ class _HfExamplesInfo: "DeepseekForCausalLM": _HfExamplesInfo("deepseek-ai/deepseek-llm-7b-chat"), "DeepseekV2ForCausalLM": _HfExamplesInfo("deepseek-ai/DeepSeek-V2-Lite-Chat", # noqa: E501 trust_remote_code=True), + "DeepseekV3ForCausalLM": _HfExamplesInfo("deepseek-ai/DeepSeek-V3", # noqa: E501 + trust_remote_code=True), "ExaoneForCausalLM": _HfExamplesInfo("LGAI-EXAONE/EXAONE-3.0-7.8B-Instruct"), # noqa: E501 "FalconForCausalLM": _HfExamplesInfo("tiiuae/falcon-7b"), "GemmaForCausalLM": _HfExamplesInfo("google/gemma-2b"), diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index cfdbea8004c35..ba2b8377759d6 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -9,14 +9,13 @@ from vllm.inputs.preprocess import InputPreprocessor from vllm.logger import init_logger from vllm.lora.request import LoRARequest -from vllm.outputs import PoolingRequestOutput, RequestOutput +from vllm.outputs import RequestOutput from vllm.pooling_params import PoolingParams from vllm.prompt_adapter.request import PromptAdapterRequest from vllm.sampling_params import SamplingParams from vllm.transformers_utils.tokenizer import AnyTokenizer from vllm.transformers_utils.tokenizer_group import init_tokenizer_from_configs from vllm.usage.usage_lib import UsageContext -from vllm.v1.engine.async_stream import AsyncStream from vllm.v1.engine.core_client import EngineCoreClient from vllm.v1.engine.detokenizer import Detokenizer from vllm.v1.engine.processor import Processor @@ -54,10 +53,8 @@ def __init__( lora_config=vllm_config.lora_config) self.tokenizer.ping() - # Request streams (map of request_id -> AsyncStream). - self.request_streams: Dict[str, AsyncStream] = {} - # List of cancelled request ids to be aborted. - self.client_aborted_requests: List[str] = [] + # Request streams (map of request_id -> queue). + self.rid_to_queue: Dict[str, asyncio.Queue] = {} # Processor (converts Inputs --> EngineCoreRequests). self.processor = Processor( @@ -153,14 +150,13 @@ async def add_request( trace_headers: Optional[Mapping[str, str]] = None, prompt_adapter_request: Optional[PromptAdapterRequest] = None, priority: int = 0, - ) -> AsyncGenerator[Union[RequestOutput, PoolingRequestOutput], None]: + ) -> asyncio.Queue[RequestOutput]: """Add new request to the AsyncLLM.""" - if self.detokenizer.is_request_active(request_id): - raise ValueError(f"Request {request_id} already exists.") - - # 1) Create a new AsyncStream for the request. - stream = self._add_request_to_streams(request_id) + # 1) Create a new output queue for the request. + if request_id in self.rid_to_queue: + raise ValueError(f"Request id {request_id} already running.") + self.rid_to_queue[request_id] = asyncio.Queue() # 2) Convert input --> DetokenizerRequest / EngineCoreRequest. detokenizer_req, engine_core_req = self.processor.process_inputs( @@ -173,8 +169,10 @@ async def add_request( # 4) Add the EngineCoreRequest to EngineCore (separate process). await self.engine_core.add_request_async(engine_core_req) - # 5) Return the generator. - return stream.generator() + if self.log_requests: + logger.info("Added request %s.", request_id) + + return self.rid_to_queue[request_id] # TODO: we should support multiple prompts in one call, as you # can do with LLM.generate. So that for multi-prompt completion @@ -194,7 +192,7 @@ async def generate( """ Main function called by the API server to kick off a request * 1) Making an AsyncStream corresponding to the Request. - # 2) Processing the Input. + * 2) Processing the Input. * 3) Adding the Request to the Detokenizer. * 4) Adding the Request to the EngineCore (separate process). @@ -206,14 +204,15 @@ async def generate( returning the RequestOutput back to the caller. """ - # We start the output_handler on the first call to generate() so that - # we can call __init__ before the event loop starts, which enables us - # to handle startup failure gracefully in the OpenAI server. - if self.output_handler is None: - self.output_handler = asyncio.create_task( - self._run_output_handler()) - - async for output in await self.add_request( + try: + # We start the output_handler on the first call to generate() so + # we can call __init__ before the event loop, which enables us + # to handle startup failure gracefully in the OpenAI server. + if self.output_handler is None: + self.output_handler = asyncio.create_task( + self._run_output_handler()) + + q = await self.add_request( request_id, prompt, sampling_params, @@ -221,79 +220,42 @@ async def generate( trace_headers=trace_headers, prompt_adapter_request=prompt_adapter_request, priority=priority, - ): - yield output - - def _finish_stream(self, request_id: str): - stream = self.request_streams.pop(request_id, None) - if stream is not None: - stream.finish() - - def _add_request_to_streams( - self, - request_id: str, - ) -> AsyncStream: - - if request_id in self.request_streams: - raise ValueError(f"Request id {request_id} already running.") - - # Avoid streams having circular ref to parent AsyncLLM object. - aborted_reqs = self.client_aborted_requests - stream = AsyncStream(request_id, aborted_reqs.append) - self.request_streams[request_id] = stream - - if self.log_requests: - logger.info("Added request %s.", request_id) + ) - return stream - - async def _process_cancellations(self) -> None: - """ - Process requests cancelled from user disconnecting. - - When a client disconnects, AsyncStream._cancel() is called. - We passed a callback to AsyncStream(), which appends to - self.client_aborted_requests. - - As a result, if any requests are canceled from the user side - the request_id will show up in self.client_aborted_requests. - """ - - # Avoid streams having circular ref to parent AsyncLLM object. - if not self.client_aborted_requests: - return - reqs_to_abort = self.client_aborted_requests.copy() - self.client_aborted_requests.clear() - - # Remove from Detokenizer. - self.detokenizer.abort_requests(reqs_to_abort) - - # Remove from RequestStreams. - for request_id in reqs_to_abort: - if self.log_requests: - logger.info("User-cancelled request %s.", request_id) - self._finish_stream(request_id) - - # Remove from EngineCore. - await self.engine_core.abort_requests_async(reqs_to_abort) + # The output_handler task pushes items into the queue. + # This task pulls from the queue and yields to caller. + while True: + # Note: drain queue without await if possible (avoids + # task switching under load which helps performance). + out = q.get_nowait() if q.qsize() > 0 else await q.get() + + # Note: both Detokenizer and EngineCore handle their + # own request cleanup based on finished. + if out.finished: + del self.rid_to_queue[request_id] + yield out + break + + yield out + + # If the request is disconnected by the client, the + # generate() task will be canceled. So, we abort the + # request if we end up here. + except asyncio.CancelledError: + await self.abort(request_id) + raise def _process_request_outputs(self, request_outputs: List[RequestOutput]): - """Process outputs by putting them into per-request AsyncStreams.""" + """Process outputs by putting them into per-request queues.""" for request_output in request_outputs: request_id = request_output.request_id - assert request_id in self.request_streams - # Each request in the API server pulls from the per-request stream. - stream = self.request_streams.get(request_id) - if stream is not None: - stream.put(request_output) - - # If finished, remove from the tracker. - if request_output.finished: - if self.log_requests: - logger.info("Finished request %s.", request_id) - self._finish_stream(request_id) + # Note: it is possible a request was aborted and removed from + # the state due to client cancellations, so if we encounter a + # request id not in the state, we skip. + if request_id in self.rid_to_queue: + self.rid_to_queue[request_id].put_nowait(request_output) async def _run_output_handler(self): """Background loop: pulls from EngineCore and pushes to AsyncStreams.""" @@ -306,24 +268,27 @@ async def _run_output_handler(self): # 2) Detokenize based on the output. request_outputs, reqs_to_abort = self.detokenizer.step(outputs) - # 3) Put the RequestOutputs into the per-request AsyncStreams. + # 3) Put the RequestOutputs into the per-request queues. self._process_request_outputs(request_outputs) # 4) Abort any requests that finished due to stop strings. await self.engine_core.abort_requests_async(reqs_to_abort) - # 5) Abort any requests due to client cancellations. - await self._process_cancellations() - except BaseException as e: logger.error(e) raise e - # TODO: can we eliminate these? - async def abort(self, request_id: str) -> None: - # Note: Who Calls this? I dont think this is actually used. - raise ValueError("Not Supported on V1 yet.") + """Abort RequestId in self, detokenizer, and engine core.""" + + request_ids = [request_id] + await self.engine_core.abort_requests_async(request_ids) + self.detokenizer.abort_requests(request_ids) + + # If a request finishes while we await then the request_id + # will be removed from the tracked queues before we get here. + if request_id in self.rid_to_queue: + del self.rid_to_queue[request_id] def encode( self, diff --git a/vllm/v1/engine/async_stream.py b/vllm/v1/engine/async_stream.py deleted file mode 100644 index 35449238c3259..0000000000000 --- a/vllm/v1/engine/async_stream.py +++ /dev/null @@ -1,55 +0,0 @@ -import asyncio -from typing import Any, AsyncGenerator, Callable, Optional, Type, Union - -from vllm.outputs import PoolingRequestOutput, RequestOutput - - -class AsyncStream: - """A stream of RequestOutputs or PoolingRequestOutputs for a request - that can be iterated over asynchronously via an async generator.""" - - STOP_ITERATION = Exception() # Sentinel - - def __init__(self, request_id: str, cancel: Callable[[str], None]) -> None: - self.request_id = request_id - self._cancel = cancel - self._queue: asyncio.Queue = asyncio.Queue() - self._finished = False - - def put(self, item: Union[RequestOutput, PoolingRequestOutput, - Exception]) -> None: - if not self._finished: - self._queue.put_nowait(item) - - def finish( - self, - exception: Optional[Union[BaseException, Type[BaseException]]] = None, - ) -> None: - if not self._finished: - self._finished = True - self._queue.put_nowait(exception if self._is_raisable(exception) - else AsyncStream.STOP_ITERATION) - - async def generator( - self - ) -> AsyncGenerator[Union[RequestOutput, PoolingRequestOutput], None]: - finished = False - try: - while True: - result = await self._queue.get() - if self._is_raisable(result): - finished = True - if result == AsyncStream.STOP_ITERATION: - return - raise result - yield result - finally: - self._finished = True - if not finished: - self._cancel(self.request_id) - - @staticmethod - def _is_raisable(value: Any): - return isinstance(value, BaseException) or \ - (isinstance(value, type) and \ - issubclass(value, BaseException)) diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index 497d5db5b4c99..0aef61fc7f680 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -32,7 +32,7 @@ POLLING_TIMEOUT_MS = 5000 POLLING_TIMEOUT_S = POLLING_TIMEOUT_MS // 1000 -LOGGING_TIME_S = 5000 +LOGGING_TIME_S = POLLING_TIMEOUT_S class EngineCore: