Skip to content

Commit

Permalink
[V1] Improve TP>1 Error Handling + Stack Trace (#11721)
Browse files Browse the repository at this point in the history
Co-authored-by: Tyler Michael Smith <[email protected]>
  • Loading branch information
robertgshaw2-redhat and tlrmchlsmth authored Jan 3, 2025
1 parent 61fed92 commit 1543914
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 21 deletions.
16 changes: 0 additions & 16 deletions vllm/v1/engine/async_llm.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import os
import signal
from typing import AsyncGenerator, Dict, List, Mapping, Optional, Type, Union

from vllm.config import ModelConfig, VllmConfig
Expand Down Expand Up @@ -42,21 +41,6 @@ def __init__(
start_engine_loop: bool = True,
) -> None:

# The child processes will send SIGQUIT when unrecoverable
# errors happen. We kill the process tree here so that the
# stack trace is very evident.
# TODO: rather than killing the main process, we should
# figure out how to raise an AsyncEngineDeadError and
# handle at the API server level so we can return a better
# error code to the clients calling VLLM.
def sigquit_handler(signum, frame):
logger.fatal(
"AsyncLLM got SIGQUIT from worker processes, shutting "
"down. See stack trace above for root cause issue.")
kill_process_tree(os.getpid())

signal.signal(signal.SIGQUIT, sigquit_handler)

assert start_engine_loop

self.log_requests = log_requests
Expand Down
2 changes: 1 addition & 1 deletion vllm/v1/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def signal_handler(signum, frame):
except Exception:
traceback = get_exception_traceback()
logger.error("EngineCore hit an exception: %s", traceback)
parent_process.send_signal(signal.SIGQUIT)
parent_process.send_signal(signal.SIGUSR1)

finally:
if engine_core is not None:
Expand Down
19 changes: 18 additions & 1 deletion vllm/v1/engine/core_client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os
import signal
import weakref
from abc import ABC, abstractmethod
from typing import List, Type
Expand All @@ -8,7 +10,8 @@

from vllm.config import VllmConfig
from vllm.logger import init_logger
from vllm.utils import get_open_zmq_ipc_path, make_zmq_socket
from vllm.utils import (get_open_zmq_ipc_path, kill_process_tree,
make_zmq_socket)
from vllm.v1.engine import (EngineCoreOutput, EngineCoreOutputs,
EngineCoreProfile, EngineCoreRequest,
EngineCoreRequestType, EngineCoreRequestUnion)
Expand Down Expand Up @@ -134,6 +137,20 @@ def __init__(
executor_class: Type[Executor],
log_stats: bool = False,
):
# The child processes will send SIGUSR1 when unrecoverable
# errors happen. We kill the process tree here so that the
# stack trace is very evident.
# TODO(rob): rather than killing the main process, we should
# figure out how to raise an AsyncEngineDeadError and
# handle at the API server level so we can return a better
# error code to the clients calling VLLM.
def sigusr1_handler(signum, frame):
logger.fatal("Got fatal signal from worker processes, shutting "
"down. See stack trace above for root cause issue.")
kill_process_tree(os.getpid())

signal.signal(signal.SIGUSR1, sigusr1_handler)

# Serialization setup.
self.encoder = PickleEncoder()
self.decoder = msgspec.msgpack.Decoder(EngineCoreOutputs)
Expand Down
24 changes: 21 additions & 3 deletions vllm/v1/executor/multiproc_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from multiprocessing.process import BaseProcess
from typing import Any, Dict, List, Optional, Tuple

import psutil
import zmq

from vllm.config import VllmConfig
Expand Down Expand Up @@ -38,6 +39,19 @@ def __init__(self, vllm_config: VllmConfig) -> None:
# and ensure workers will be terminated.
self._finalizer = weakref.finalize(self, self.shutdown)

# The child processes will send SIGUSR1 when unrecoverable
# errors happen.
def sigusr1_handler(signum, frame):
logger.fatal(
"MulitprocExecutor got fatal signal from worker processes, "
"shutting down. See stack trace above for root cause issue.")
# Propagate error up to parent process.
parent_process = psutil.Process().parent()
parent_process.send_signal(signal.SIGUSR1)
self.shutdown()

signal.signal(signal.SIGUSR1, sigusr1_handler)

self.vllm_config = vllm_config
self.parallel_config = vllm_config.parallel_config

Expand Down Expand Up @@ -335,8 +349,11 @@ def signal_handler(signum, frame):
except SystemExit:
logger.debug("Worker interrupted.")

except BaseException as e:
logger.exception(e)
except Exception:
# worker_busy_loop sends exceptions exceptons to Executor
# for shutdown, but if there is an error in startup or an
# error with IPC itself, we need to alert the parent.
psutil.Process().parent().send_signal(signal.SIGUSR1)
raise

finally:
Expand Down Expand Up @@ -377,9 +394,10 @@ def worker_busy_loop(self):

try:
output = getattr(self.worker, method)(*args, **kwargs)
except BaseException as e:
except Exception as e:
self.worker_response_mq.enqueue(
(WorkerProc.ResponseStatus.FAILURE, e))
logger.exception("WorkerProc hit an exception: %s", exc_info=e)
continue

self.worker_response_mq.enqueue(
Expand Down

0 comments on commit 1543914

Please sign in to comment.