Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[V1] [2/n] Logging and Metrics - OutputProcessor Abstraction #11973

Merged
Merged
Show file tree
Hide file tree
Changes from 51 commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
cfa8c2b
added code
robertgshaw2-redhat Jan 11, 2025
6d8e4f3
fixed
robertgshaw2-redhat Jan 11, 2025
c78a56f
fixed
robertgshaw2-redhat Jan 11, 2025
7b39705
updated
robertgshaw2-redhat Jan 11, 2025
6e9cd1c
updated
robertgshaw2-redhat Jan 11, 2025
2657b7f
fixed
robertgshaw2-redhat Jan 11, 2025
249b9ff
updated
robertgshaw2-redhat Jan 11, 2025
c1f9292
refactoring metrics
robertgshaw2-redhat Jan 11, 2025
c641866
updated
robertgshaw2-redhat Jan 12, 2025
1ce7a5f
updated
robertgshaw2-redhat Jan 12, 2025
c1baa6d
Merge branch 'v1-metrics' into v1-metrics-2
robertgshaw2-redhat Jan 12, 2025
f8de299
added output processor
robertgshaw2-redhat Jan 12, 2025
49ca9bb
added all files
robertgshaw2-redhat Jan 12, 2025
86d33a1
stash
robertgshaw2-redhat Jan 12, 2025
4066fc8
working again
robertgshaw2-redhat Jan 12, 2025
5ef374c
Merge branch 'v1-metrics' into v1-metrics-2
robertgshaw2-redhat Jan 12, 2025
c9ffc60
fixed sorting
robertgshaw2-redhat Jan 12, 2025
5f3f3b7
Merge branch 'main' into v1-metrics-2
robertgshaw2-redhat Jan 12, 2025
e34b9dc
merged
robertgshaw2-redhat Jan 12, 2025
dd6e3d6
reduce number of changes
robertgshaw2-redhat Jan 12, 2025
dbd86b8
reduce changes
robertgshaw2-redhat Jan 12, 2025
ebf3530
reduce changes
robertgshaw2-redhat Jan 12, 2025
7b6d9b3
updared
robertgshaw2-redhat Jan 12, 2025
707796f
make pr more reviewable
robertgshaw2-redhat Jan 12, 2025
df72c8f
update comments
robertgshaw2-redhat Jan 12, 2025
9d67efc
make PR more readable
robertgshaw2-redhat Jan 12, 2025
1cae783
reduce cruft
robertgshaw2-redhat Jan 12, 2025
6401cfa
reduce changes
robertgshaw2-redhat Jan 12, 2025
33bc01d
reduce changes
robertgshaw2-redhat Jan 12, 2025
7dda305
updated
robertgshaw2-redhat Jan 12, 2025
769cff5
reduce changes
robertgshaw2-redhat Jan 12, 2025
b1b4c47
minor cleanups
robertgshaw2-redhat Jan 12, 2025
2f916d1
clean up
robertgshaw2-redhat Jan 12, 2025
6a5f245
updated
robertgshaw2-redhat Jan 12, 2025
9ea36c8
updated
robertgshaw2-redhat Jan 12, 2025
318c203
reduce changes
robertgshaw2-redhat Jan 12, 2025
3746183
reduce LOC changes
robertgshaw2-redhat Jan 12, 2025
449405b
updated
robertgshaw2-redhat Jan 12, 2025
79f2f5f
remove file
robertgshaw2-redhat Jan 12, 2025
a16d27f
updated
robertgshaw2-redhat Jan 12, 2025
19372f9
reduce LOC changes
robertgshaw2-redhat Jan 12, 2025
39be503
updated
robertgshaw2-redhat Jan 12, 2025
833f028
updated
robertgshaw2-redhat Jan 12, 2025
ef2c3f9
updated
robertgshaw2-redhat Jan 12, 2025
33303fc
updated
robertgshaw2-redhat Jan 12, 2025
edae5d2
updated
robertgshaw2-redhat Jan 12, 2025
a20c7b5
updated
robertgshaw2-redhat Jan 12, 2025
b7e5a91
updated
robertgshaw2-redhat Jan 12, 2025
9353010
fixed
robertgshaw2-redhat Jan 12, 2025
94de9f5
cleanup
robertgshaw2-redhat Jan 12, 2025
2ea4283
revert abort test
robertgshaw2-redhat Jan 12, 2025
b9683d1
updared
robertgshaw2-redhat Jan 12, 2025
92c3b0c
stash
robertgshaw2-redhat Jan 12, 2025
a985a73
added logging and comment
robertgshaw2-redhat Jan 12, 2025
6c36d87
starting to fix tests - stash
robertgshaw2-redhat Jan 12, 2025
595fd12
updated tests
robertgshaw2-redhat Jan 12, 2025
5ecfe8e
make tests pass
robertgshaw2-redhat Jan 12, 2025
5f37918
reduce LOC changes
robertgshaw2-redhat Jan 12, 2025
1d9b233
updated
robertgshaw2-redhat Jan 12, 2025
2880962
added IterationStats test
robertgshaw2-redhat Jan 13, 2025
7de7c00
codespell
robertgshaw2-redhat Jan 13, 2025
eec573c
add comment about invairant
robertgshaw2-redhat Jan 13, 2025
0427e03
updated
robertgshaw2-redhat Jan 13, 2025
9b49133
tweak
robertgshaw2-redhat Jan 13, 2025
bffa5d0
formatting and added test
robertgshaw2-redhat Jan 13, 2025
605c5f0
passing
robertgshaw2-redhat Jan 13, 2025
d0013a4
ruff ruff
robertgshaw2-redhat Jan 13, 2025
e01d236
format
robertgshaw2-redhat Jan 13, 2025
a53f089
run isort
robertgshaw2-redhat Jan 13, 2025
3e45fc6
undo fat finger
robertgshaw2-redhat Jan 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 33 additions & 53 deletions vllm/v1/engine/async_llm.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
import os
from typing import AsyncGenerator, Dict, List, Mapping, Optional, Type, Union
from typing import AsyncGenerator, List, Mapping, Optional, Type, Union

from vllm.config import ModelConfig, VllmConfig
from vllm.engine.arg_utils import AsyncEngineArgs
Expand All @@ -18,11 +18,11 @@
from vllm.usage.usage_lib import UsageContext
from vllm.utils import kill_process_tree
from vllm.v1.engine.core_client import EngineCoreClient
from vllm.v1.engine.detokenizer import Detokenizer
from vllm.v1.engine.output_processor import OutputProcessor
from vllm.v1.engine.processor import Processor
from vllm.v1.executor.abstract import Executor
from vllm.v1.metrics.loggers import LoggingStatLogger, StatLoggerBase
from vllm.v1.metrics.stats import SchedulerStats
from vllm.v1.metrics.stats import IterationStats, SchedulerStats

logger = init_logger(__name__)

Expand Down Expand Up @@ -59,9 +59,6 @@ def __init__(
lora_config=vllm_config.lora_config)
self.tokenizer.ping()

# Request streams (map of request_id -> queue).
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: these queues are held in OutputProcessor

self.rid_to_queue: Dict[str, asyncio.Queue] = {}

# Processor (converts Inputs --> EngineCoreRequests).
self.processor = Processor(
model_config=vllm_config.model_config,
Expand All @@ -71,13 +68,9 @@ def __init__(
input_registry=input_registry,
)

# Detokenizer (converts EngineCoreOutputs --> RequestOutput).
self.detokenizer = Detokenizer(
tokenizer_name=vllm_config.model_config.tokenizer,
tokenizer_mode=vllm_config.model_config.tokenizer_mode,
trust_remote_code=vllm_config.model_config.trust_remote_code,
revision=vllm_config.model_config.tokenizer_revision,
)
# OutputProcessor (converts EngineCoreOutputs --> RequestOutput).
self.output_processor = OutputProcessor(self.tokenizer,
log_stats=self.log_stats)

# EngineCore (starts the engine in background process).
self.engine_core = EngineCoreClient.make_client(
Expand Down Expand Up @@ -140,9 +133,9 @@ async def add_request(
"""Add new request to the AsyncLLM."""

# 1) Create a new output queue for the request.
if request_id in self.rid_to_queue:
if request_id in self.output_processor.request_states:
raise ValueError(f"Request id {request_id} already running.")
self.rid_to_queue[request_id] = asyncio.Queue()
queue: asyncio.Queue[RequestOutput] = asyncio.Queue()

# 2) Convert Input --> Request.
request = self.processor.process_inputs(request_id, prompt, params,
Expand All @@ -151,16 +144,16 @@ async def add_request(
prompt_adapter_request,
priority)

# 3) Add the request to Detokenizer (this process).
self.detokenizer.add_request(request)
# 3) Add the request to OutputProcessor (this process).
self.output_processor.add_request(request, queue)

# 4) Add the EngineCoreRequest to EngineCore (separate process).
await self.engine_core.add_request_async(request)

if self.log_requests:
logger.info("Added request %s.", request_id)

return self.rid_to_queue[request_id]
return queue

# TODO: we should support multiple prompts in one call, as you
# can do with LLM.generate. So that for multi-prompt completion
Expand Down Expand Up @@ -217,10 +210,9 @@ async def generate(
# task switching under load which helps performance).
out = q.get_nowait() if q.qsize() > 0 else await q.get()

# Note: both Detokenizer and EngineCore handle their
# Note: both OutputProcessor and EngineCore handle their
# own request cleanup based on finished.
if out.finished:
del self.rid_to_queue[request_id]
yield out
break

Expand All @@ -233,57 +225,46 @@ async def generate(
await self.abort(request_id)
raise

def _process_request_outputs(self, request_outputs: List[RequestOutput]):
Copy link
Collaborator Author

@robertgshaw2-redhat robertgshaw2-redhat Jan 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is moved into OutputProcessor.process_outputs() loop

"""Process outputs by putting them into per-request queues."""

for request_output in request_outputs:
request_id = request_output.request_id

# Note: it is possible a request was aborted and removed from
# the state due to client cancellations, so if we encounter a
# request id not in the state, we skip.
if request_id in self.rid_to_queue:
self.rid_to_queue[request_id].put_nowait(request_output)

async def _run_output_handler(self):
"""Background loop: pulls from EngineCore and pushes to AsyncStreams."""

try:
while True:
# 1) Pull EngineCoreOutput from the EngineCore.
# 1) Pull EngineCoreOutputs from the EngineCore.
outputs = await self.engine_core.get_output_async()

# 2) Detokenize based on the output.
request_outputs, reqs_to_abort = self.detokenizer.step(
# 2) Process EngineCoreOutputs.
processed_outputs = self.output_processor.process_outputs(
outputs.outputs)

# 3) Put the RequestOutputs into the per-request queues.
self._process_request_outputs(request_outputs)
# 3) Abort any reqs that finished due to stop strings.
await self.engine_core.abort_requests_async(
processed_outputs.reqs_to_abort)

# 4) Abort any requests that finished due to stop strings.
await self.engine_core.abort_requests_async(reqs_to_abort)

# 5) Log any stats.
await self._log_stats(scheduler_stats=outputs.scheduler_stats)
# 4) Logging.
# TODO(rob): make into a coroutine and launch it in
# background thread once we add Prometheus.
self._log_stats(
scheduler_stats=outputs.scheduler_stats,
iteration_stats=processed_outputs.iteration_stats,
)

except Exception as e:
logger.exception("EngineCore output handler hit an error: %s", e)
kill_process_tree(os.getpid())

async def abort(self, request_id: str) -> None:
"""Abort RequestId in self, detokenizer, and engine core."""
"""Abort RequestId in OutputProcessor and EngineCore."""

request_ids = [request_id]
await self.engine_core.abort_requests_async(request_ids)
self.detokenizer.abort_requests(request_ids)

# If a request finishes while we await then the request_id
# will be removed from the tracked queues before we get here.
if request_id in self.rid_to_queue:
del self.rid_to_queue[request_id]
self.output_processor.abort_requests(request_ids)

async def _log_stats(self, scheduler_stats: SchedulerStats):
"""Log stats to the stat loggers."""
def _log_stats(
self,
scheduler_stats: SchedulerStats,
iteration_stats: IterationStats,
):
if not self.log_stats:
return

Expand Down Expand Up @@ -314,8 +295,7 @@ async def get_tokenizer(
self,
lora_request: Optional[LoRARequest] = None,
) -> AnyTokenizer:
assert lora_request is None
return self.detokenizer.tokenizer
return self.tokenizer.get_lora_tokenizer(lora_request)

async def is_tracing_enabled(self) -> bool:
return False
Expand Down
6 changes: 4 additions & 2 deletions vllm/v1/engine/core_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ def add_request(self, request: EngineCoreRequest) -> None:
self.engine_core.add_request(request)

def abort_requests(self, request_ids: List[str]) -> None:
self.engine_core.abort_requests(request_ids)
if len(request_ids) > 0:
self.engine_core.abort_requests(request_ids)

def shutdown(self):
self.engine_core.shutdown()
Expand Down Expand Up @@ -221,7 +222,8 @@ def add_request(self, request: EngineCoreRequest) -> None:
self._send_input(EngineCoreRequestType.ADD, request)

def abort_requests(self, request_ids: List[str]) -> None:
self._send_input(EngineCoreRequestType.ABORT, request_ids)
if len(request_ids) > 0:
self._send_input(EngineCoreRequestType.ABORT, request_ids)

def profile(self, is_start: bool = True) -> None:
self._send_input(EngineCoreRequestType.PROFILE,
Expand Down
Loading
Loading