Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[V1] [2/n] Logging and Metrics - OutputProcessor Abstraction #11973

Merged
Merged
Show file tree
Hide file tree
Changes from 69 commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
cfa8c2b
added code
robertgshaw2-redhat Jan 11, 2025
6d8e4f3
fixed
robertgshaw2-redhat Jan 11, 2025
c78a56f
fixed
robertgshaw2-redhat Jan 11, 2025
7b39705
updated
robertgshaw2-redhat Jan 11, 2025
6e9cd1c
updated
robertgshaw2-redhat Jan 11, 2025
2657b7f
fixed
robertgshaw2-redhat Jan 11, 2025
249b9ff
updated
robertgshaw2-redhat Jan 11, 2025
c1f9292
refactoring metrics
robertgshaw2-redhat Jan 11, 2025
c641866
updated
robertgshaw2-redhat Jan 12, 2025
1ce7a5f
updated
robertgshaw2-redhat Jan 12, 2025
c1baa6d
Merge branch 'v1-metrics' into v1-metrics-2
robertgshaw2-redhat Jan 12, 2025
f8de299
added output processor
robertgshaw2-redhat Jan 12, 2025
49ca9bb
added all files
robertgshaw2-redhat Jan 12, 2025
86d33a1
stash
robertgshaw2-redhat Jan 12, 2025
4066fc8
working again
robertgshaw2-redhat Jan 12, 2025
5ef374c
Merge branch 'v1-metrics' into v1-metrics-2
robertgshaw2-redhat Jan 12, 2025
c9ffc60
fixed sorting
robertgshaw2-redhat Jan 12, 2025
5f3f3b7
Merge branch 'main' into v1-metrics-2
robertgshaw2-redhat Jan 12, 2025
e34b9dc
merged
robertgshaw2-redhat Jan 12, 2025
dd6e3d6
reduce number of changes
robertgshaw2-redhat Jan 12, 2025
dbd86b8
reduce changes
robertgshaw2-redhat Jan 12, 2025
ebf3530
reduce changes
robertgshaw2-redhat Jan 12, 2025
7b6d9b3
updared
robertgshaw2-redhat Jan 12, 2025
707796f
make pr more reviewable
robertgshaw2-redhat Jan 12, 2025
df72c8f
update comments
robertgshaw2-redhat Jan 12, 2025
9d67efc
make PR more readable
robertgshaw2-redhat Jan 12, 2025
1cae783
reduce cruft
robertgshaw2-redhat Jan 12, 2025
6401cfa
reduce changes
robertgshaw2-redhat Jan 12, 2025
33bc01d
reduce changes
robertgshaw2-redhat Jan 12, 2025
7dda305
updated
robertgshaw2-redhat Jan 12, 2025
769cff5
reduce changes
robertgshaw2-redhat Jan 12, 2025
b1b4c47
minor cleanups
robertgshaw2-redhat Jan 12, 2025
2f916d1
clean up
robertgshaw2-redhat Jan 12, 2025
6a5f245
updated
robertgshaw2-redhat Jan 12, 2025
9ea36c8
updated
robertgshaw2-redhat Jan 12, 2025
318c203
reduce changes
robertgshaw2-redhat Jan 12, 2025
3746183
reduce LOC changes
robertgshaw2-redhat Jan 12, 2025
449405b
updated
robertgshaw2-redhat Jan 12, 2025
79f2f5f
remove file
robertgshaw2-redhat Jan 12, 2025
a16d27f
updated
robertgshaw2-redhat Jan 12, 2025
19372f9
reduce LOC changes
robertgshaw2-redhat Jan 12, 2025
39be503
updated
robertgshaw2-redhat Jan 12, 2025
833f028
updated
robertgshaw2-redhat Jan 12, 2025
ef2c3f9
updated
robertgshaw2-redhat Jan 12, 2025
33303fc
updated
robertgshaw2-redhat Jan 12, 2025
edae5d2
updated
robertgshaw2-redhat Jan 12, 2025
a20c7b5
updated
robertgshaw2-redhat Jan 12, 2025
b7e5a91
updated
robertgshaw2-redhat Jan 12, 2025
9353010
fixed
robertgshaw2-redhat Jan 12, 2025
94de9f5
cleanup
robertgshaw2-redhat Jan 12, 2025
2ea4283
revert abort test
robertgshaw2-redhat Jan 12, 2025
b9683d1
updared
robertgshaw2-redhat Jan 12, 2025
92c3b0c
stash
robertgshaw2-redhat Jan 12, 2025
a985a73
added logging and comment
robertgshaw2-redhat Jan 12, 2025
6c36d87
starting to fix tests - stash
robertgshaw2-redhat Jan 12, 2025
595fd12
updated tests
robertgshaw2-redhat Jan 12, 2025
5ecfe8e
make tests pass
robertgshaw2-redhat Jan 12, 2025
5f37918
reduce LOC changes
robertgshaw2-redhat Jan 12, 2025
1d9b233
updated
robertgshaw2-redhat Jan 12, 2025
2880962
added IterationStats test
robertgshaw2-redhat Jan 13, 2025
7de7c00
codespell
robertgshaw2-redhat Jan 13, 2025
eec573c
add comment about invairant
robertgshaw2-redhat Jan 13, 2025
0427e03
updated
robertgshaw2-redhat Jan 13, 2025
9b49133
tweak
robertgshaw2-redhat Jan 13, 2025
bffa5d0
formatting and added test
robertgshaw2-redhat Jan 13, 2025
605c5f0
passing
robertgshaw2-redhat Jan 13, 2025
d0013a4
ruff ruff
robertgshaw2-redhat Jan 13, 2025
e01d236
format
robertgshaw2-redhat Jan 13, 2025
a53f089
run isort
robertgshaw2-redhat Jan 13, 2025
3e45fc6
undo fat finger
robertgshaw2-redhat Jan 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 58 additions & 11 deletions tests/v1/engine/test_async_llm.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import asyncio
from typing import Tuple
from typing import List, Tuple

import pytest

Expand All @@ -13,6 +13,7 @@
allow_module_level=True)

ENGINE_ARGS = AsyncEngineArgs(model="meta-llama/Llama-3.2-1B",
enforce_eager=True,
mgoin marked this conversation as resolved.
Show resolved Hide resolved
disable_log_requests=True)


Expand Down Expand Up @@ -53,17 +54,63 @@ async def test_load(monkeypatch):
generate(engine, request_id, NUM_EXPECTED_TOKENS)))

# Confirm that we got all the EXPECTED tokens from the requests.
failed_request_id = None
tokens = None
for task in tasks:
num_generated_tokens, request_id = await task
if (num_generated_tokens != NUM_EXPECTED_TOKENS
and failed_request_id is None):
failed_request_id = request_id
tokens = num_generated_tokens

assert failed_request_id is None, (
f"{failed_request_id} generated {tokens} but "
f"expected {NUM_EXPECTED_TOKENS}")
assert num_generated_tokens == NUM_EXPECTED_TOKENS, (
f"{request_id} generated {num_generated_tokens} but "
f"expected {NUM_EXPECTED_TOKENS}")

assert not engine.output_processor.has_unfinished_requests()
engine.shutdown()


@pytest.mark.asyncio
async def test_abort(monkeypatch):

with monkeypatch.context() as m:
m.setenv("VLLM_USE_V1", "1")

engine = AsyncLLM.from_engine_args(ENGINE_ARGS)

NUM_REQUESTS = 100
NUM_EXPECTED_TOKENS = 100
REQUEST_IDS_TO_ABORT = range(1, 100, 10)

request_ids = [f"request-{i}" for i in range(NUM_REQUESTS)]

# Create concurrent requests.
tasks: List[asyncio.Task] = []
for request_id in request_ids:
tasks.append(
asyncio.create_task(
generate(engine, request_id, NUM_EXPECTED_TOKENS)))

# API server cancels requests when they disconnect.
for idx in REQUEST_IDS_TO_ABORT:
tasks[idx].cancel()
await asyncio.sleep(0.1)

# Confirm the other requests are okay.
for idx, task in enumerate(tasks):
# Confirm that it was actually canceled.
if idx in REQUEST_IDS_TO_ABORT:
with pytest.raises(asyncio.CancelledError):
await task
else:
# Otherwise, make sure the request was not impacted.
num_generated_tokens, request_id = await task
assert num_generated_tokens == NUM_EXPECTED_TOKENS, (
f"{request_id} generated {num_generated_tokens} but "
f"expected {NUM_EXPECTED_TOKENS}")

assert not engine.output_processor.has_unfinished_requests()

# Confirm we can do another generation.
request_id = f"request-{REQUEST_IDS_TO_ABORT[0]}"
task = asyncio.create_task(
generate(engine, request_id, NUM_EXPECTED_TOKENS))
num_generated_tokens, request_id = await task
assert num_generated_tokens == NUM_EXPECTED_TOKENS
assert not engine.output_processor.has_unfinished_requests()

engine.shutdown()
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,18 @@
import pytest
from transformers import AutoTokenizer

from vllm.engine.arg_utils import EngineArgs
from vllm.sampling_params import RequestOutputKind, SamplingParams
from vllm.transformers_utils.tokenizer_group import init_tokenizer_from_configs
from vllm.v1.engine import EngineCoreOutput, EngineCoreRequest
from vllm.v1.engine.detokenizer import Detokenizer
from vllm.v1.engine.output_processor import OutputProcessor

TOKENIZER_NAME = "mistralai/Mistral-7B-Instruct-v0.3"
VLLM_CONFIG = EngineArgs(model=TOKENIZER_NAME).create_engine_config()
TOKENIZER_GROUP = init_tokenizer_from_configs(VLLM_CONFIG.model_config,
VLLM_CONFIG.scheduler_config,
VLLM_CONFIG.parallel_config,
VLLM_CONFIG.lora_config)
tokenizer = AutoTokenizer.from_pretrained(TOKENIZER_NAME)

FULL_STRINGS = [
Expand Down Expand Up @@ -66,7 +73,7 @@ def get_outputs(self) -> List[EngineCoreOutput]:
"request_output_kind",
[RequestOutputKind.DELTA, RequestOutputKind.FINAL_ONLY])
def test_incremental_detokenization(request_output_kind: RequestOutputKind):
detokenizer = Detokenizer(TOKENIZER_NAME)
output_processor = OutputProcessor(TOKENIZER_GROUP, log_stats=False)
engine_core = MockEngineCore(GENERATION_TOKENS)

# Make N requests.
Expand All @@ -93,7 +100,7 @@ def test_incremental_detokenization(request_output_kind: RequestOutputKind):

# Add requests to the detokenizer.
for request in requests:
detokenizer.add_request(request)
output_processor.add_request(request)

gen_strings = {}
gen_tokens = {}
Expand All @@ -104,7 +111,9 @@ def test_incremental_detokenization(request_output_kind: RequestOutputKind):
break

# Step the Detokenizer.
request_outputs, requests_to_abort = detokenizer.step(outputs)
processed_outputs = output_processor.process_outputs(outputs, )
request_outputs = processed_outputs.request_outputs
requests_to_abort = processed_outputs.reqs_to_abort
assert len(requests_to_abort) == 0

# Update tracking.
Expand All @@ -128,41 +137,35 @@ def test_incremental_detokenization(request_output_kind: RequestOutputKind):
assert gen_str == ref_gen_str, f"{gen_str=}, {ref_gen_str=}"
assert gen_toks == ref_gen_toks, f"{gen_toks=}, {ref_gen_toks=}"

assert detokenizer.get_num_unfinished_requests() == 0
assert not detokenizer.has_unfinished_requests()
assert output_processor.get_num_unfinished_requests() == 0
assert not output_processor.has_unfinished_requests()


@pytest.mark.parametrize("include_stop_str_in_output", [True, False])
def test_stop_string(include_stop_str_in_output: bool):
detokenizer = Detokenizer(TOKENIZER_NAME)
output_processor = OutputProcessor(TOKENIZER_GROUP, log_stats=False)
engine_core = MockEngineCore(GENERATION_TOKENS)

# Make N requests.
requests = [
EngineCoreRequest(
request_id=f"request-{idx}",
prompt=prompt,
prompt_token_ids=prompt_tokens,
arrival_time=0,
mm_inputs=None,
mm_hashes=None,
mm_placeholders=None,
eos_token_id=None,
lora_request=None,
sampling_params=SamplingParams(
skip_special_tokens=False,
spaces_between_special_tokens=False,
output_kind=RequestOutputKind.DELTA,
stop=STOP_STRINGS,
include_stop_str_in_output=include_stop_str_in_output,
)) for idx, (
mgoin marked this conversation as resolved.
Show resolved Hide resolved
prompt,
prompt_tokens) in enumerate(zip(PROMPT_STRINGS, PROMPT_TOKENS))
EngineCoreRequest(request_id=f"request-{idx}",
prompt=prompt,
prompt_token_ids=prompt_tokens,
arrival_time=0,
mm_inputs=None,
mm_hashes=None,
mm_placeholders=None,
eos_token_id=None,
lora_request=None,
sampling_params=SamplingParams())
for idx, (
prompt,
prompt_tokens) in enumerate(zip(PROMPT_STRINGS, PROMPT_TOKENS))
]

# Add requests to the detokenizer.
for request in requests:
detokenizer.add_request(request)
output_processor.add_request(request)

gen_strings = {}
aborted = []
Expand All @@ -173,7 +176,9 @@ def test_stop_string(include_stop_str_in_output: bool):
break

# Step the Detokenizer.
request_outputs, requests_to_abort = detokenizer.step(outputs)
processed_outputs = output_processor.process_outputs(outputs)
request_outputs = processed_outputs.request_outputs
requests_to_abort = processed_outputs.reqs_to_abort
for request_output in request_outputs:
# If aborted, we should not get a request output.
assert request_output.request_id not in aborted
Expand Down Expand Up @@ -214,5 +219,71 @@ def test_stop_string(include_stop_str_in_output: bool):
assert gen_str == ref_str_exc_stop, (
f"{gen_str=}, {ref_str_exc_stop=}")

assert detokenizer.get_num_unfinished_requests() == 0
assert not detokenizer.has_unfinished_requests()
assert output_processor.get_num_unfinished_requests() == 0
assert not output_processor.has_unfinished_requests()


def test_iteration_stats():
output_processor = OutputProcessor(TOKENIZER_GROUP, log_stats=True)
engine_core = MockEngineCore(GENERATION_TOKENS)

# Make N requests.
requests = [
EngineCoreRequest(
request_id=f"request-{idx}",
prompt=prompt,
prompt_token_ids=prompt_tokens,
arrival_time=0,
mm_inputs=None,
mm_hashes=None,
mm_placeholders=None,
eos_token_id=None,
lora_request=None,
sampling_params=SamplingParams(),
) for idx, (
prompt,
prompt_tokens) in enumerate(zip(PROMPT_STRINGS, PROMPT_TOKENS))
]

# Add all requests except one to the OutputProcessor.
num_active = len(GENERATION_TOKENS) - 1
for request in requests[:num_active]:
output_processor.add_request(request)
inactive_request = requests[num_active]

# First iteration has 2 prefills.
outputs = engine_core.get_outputs()[:num_active]
processed_outputs = output_processor.process_outputs(outputs)
iteration_stats = processed_outputs.iteration_stats
total_prompt_tokens = sum(
[len(prompt_tokens) for prompt_tokens in PROMPT_TOKENS[:num_active]])

assert iteration_stats.num_prompt_tokens == total_prompt_tokens
assert iteration_stats.num_generation_tokens == num_active

# Just decodes in this step.
outputs = engine_core.get_outputs()[:num_active]
processed_outputs = output_processor.process_outputs(outputs)
iteration_stats = processed_outputs.iteration_stats

assert iteration_stats.num_prompt_tokens == 0
assert iteration_stats.num_generation_tokens == num_active

# Add a new request - prefill and 2 decodes in this step.
output_processor.add_request(inactive_request)
num_active += 1
outputs = engine_core.get_outputs()[:num_active]
processed_outputs = output_processor.process_outputs(outputs)
iteration_stats = processed_outputs.iteration_stats
total_prompt_tokens = len(PROMPT_TOKENS[num_active - 1])

assert iteration_stats.num_prompt_tokens == total_prompt_tokens
assert iteration_stats.num_generation_tokens == num_active

# Just decodes in this step.
outputs = engine_core.get_outputs()[:num_active]
processed_outputs = output_processor.process_outputs(outputs)
iteration_stats = processed_outputs.iteration_stats

assert iteration_stats.num_prompt_tokens == 0
assert iteration_stats.num_generation_tokens == num_active
Loading
Loading