Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[V1] [6/N] API Server: Better Shutdown #11586

Merged
merged 13 commits into from
Dec 30, 2024
44 changes: 12 additions & 32 deletions vllm/entrypoints/openai/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
from vllm.logger import init_logger
from vllm.usage.usage_lib import UsageContext
from vllm.utils import (FlexibleArgumentParser, get_open_zmq_ipc_path,
is_valid_ipv6_address, kill_process_tree, set_ulimit)
is_valid_ipv6_address, set_ulimit)
from vllm.version import __version__ as VLLM_VERSION

TIMEOUT_KEEP_ALIVE = 5 # seconds
Expand Down Expand Up @@ -133,32 +133,21 @@ async def build_async_engine_client_from_engine_args(
Returns the Client or None if the creation failed.
"""

# Fall back
# TODO: fill out feature matrix.
# AsyncLLMEngine.
if (MQLLMEngineClient.is_unsupported_config(engine_args)
or envs.VLLM_USE_V1 or disable_frontend_multiprocessing):
engine_config = engine_args.create_engine_config(
UsageContext.OPENAI_API_SERVER)
uses_ray = getattr(AsyncLLMEngine._get_executor_cls(engine_config),
"uses_ray", False)

build_engine = partial(AsyncLLMEngine.from_engine_args,
engine_args=engine_args,
engine_config=engine_config,
usage_context=UsageContext.OPENAI_API_SERVER)
if uses_ray:
# Must run in main thread with ray for its signal handlers to work
engine_client = build_engine()
else:
engine_client = await asyncio.get_running_loop().run_in_executor(
None, build_engine)

yield engine_client
if hasattr(engine_client, "shutdown"):
engine_client.shutdown()
return
engine_client: Optional[EngineClient] = None
try:
engine_client = AsyncLLMEngine.from_engine_args(
engine_args=engine_args,
usage_context=UsageContext.OPENAI_API_SERVER)
yield engine_client
finally:
if engine_client and hasattr(engine_client, "shutdown"):
engine_client.shutdown()

# Otherwise, use the multiprocessing AsyncLLMEngine.
# MQLLMEngine.
else:
if "PROMETHEUS_MULTIPROC_DIR" not in os.environ:
# Make TemporaryDirectory for prometheus multiprocessing
Expand Down Expand Up @@ -737,15 +726,6 @@ def signal_handler(*_) -> None:

signal.signal(signal.SIGTERM, signal_handler)

# The child processes will send SIGQUIT to this process when
# any error happens. This process then clean up the whole tree.
# TODO(rob): move this into AsyncLLM.__init__ once we remove
# the context manager below.
def sigquit_handler(signum, frame):
kill_process_tree(os.getpid())

signal.signal(signal.SIGQUIT, sigquit_handler)

async with build_async_engine_client(args) as engine_client:
app = build_app(args)

Expand Down
27 changes: 24 additions & 3 deletions vllm/v1/engine/async_llm.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
import os
import signal
from typing import AsyncGenerator, Dict, List, Mapping, Optional, Type, Union

from vllm.config import ModelConfig, VllmConfig
Expand All @@ -16,6 +18,7 @@
from vllm.transformers_utils.tokenizer import AnyTokenizer
from vllm.transformers_utils.tokenizer_group import init_tokenizer_from_configs
from vllm.usage.usage_lib import UsageContext
from vllm.utils import get_exception_traceback, kill_process_tree
from vllm.v1.engine.core_client import EngineCoreClient
from vllm.v1.engine.detokenizer import Detokenizer
from vllm.v1.engine.processor import Processor
Expand All @@ -38,6 +41,22 @@ def __init__(
log_requests: bool = True,
start_engine_loop: bool = True,
) -> None:

# The child processes will send SIGQUIT when unrecoverable
# errors happen. We kill the process tree here so that the
# stack trace is very evident.
# TODO: rather than killing the main process, we should
# figure out how to raise an AsyncEngineDeadError and
# handle at the API server level so we can return a better
# error code to the clients calling VLLM.
def sigquit_handler(signum, frame):
logger.fatal(
"AsyncLLM got SIGQUIT from worker processes, shutting "
"down. See stack trace above for root cause issue.")
kill_process_tree(os.getpid())

signal.signal(signal.SIGQUIT, sigquit_handler)

assert start_engine_loop

self.log_requests = log_requests
Expand Down Expand Up @@ -273,10 +292,12 @@ async def _run_output_handler(self):

# 4) Abort any requests that finished due to stop strings.
await self.engine_core.abort_requests_async(reqs_to_abort)
raise ValueError("my error!")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my b


except BaseException as e:
logger.error(e)
raise e
except Exception:
traceback = get_exception_traceback()
logger.error("EngineCore hit an exception: %s", traceback)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger.error should automatically print traceback?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, thanks

kill_process_tree(os.getpid())

async def abort(self, request_id: str) -> None:
"""Abort RequestId in self, detokenizer, and engine core."""
Expand Down
Loading