-
Notifications
You must be signed in to change notification settings - Fork 0
PR_002 - CodeRabbit #26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: bench/PR_002_base
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,7 +1,5 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import pickle | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import signal | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import threading | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import time | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from contextlib import contextmanager | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from typing import Iterator, List, Optional, Union | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -17,10 +15,10 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| IPC_HEALTH_EXT, IPC_INPUT_EXT, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| IPC_OUTPUT_EXT, REQUEST_OUTPUTS_T, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| VLLM_RPC_SUCCESS_STR, RPCAbortRequest, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| RPCError, RPCProcessRequest, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| RPCStartupRequest, RPCStartupResponse) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| RPCError, RPCHealthRequest, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| RPCProcessRequest, RPCStartupRequest, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| RPCStartupResponse) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # yapf: enable | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from vllm.envs import VLLM_RPC_TIMEOUT | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from vllm.logger import init_logger | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from vllm.outputs import RequestOutput | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from vllm.usage.usage_lib import UsageContext | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -93,30 +91,16 @@ def __init__(self, | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.output_socket = self.ctx.socket(zmq.constants.PUSH) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.output_socket.bind(f"{ipc_path}{IPC_OUTPUT_EXT}") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Send heartbeats back to client. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.heartbeat_socket = self.ctx.socket(zmq.constants.PUSH) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.heartbeat_socket.bind(f"{ipc_path}{IPC_HEALTH_EXT}") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Send health status back to client. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.health_socket = self.ctx.socket(zmq.constants.PUSH) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.health_socket.bind(f"{ipc_path}{IPC_HEALTH_EXT}") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # IPC path for the data socket. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.data_ipc_path = f"{ipc_path}{IPC_DATA_EXT}" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Error state. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._errored_with: Optional[BaseException] = None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Heartbeat thread | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.heartbeat_thread = threading.Thread(target=self._heartbeat_loop, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| daemon=True) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._heartbeat_stop_event = threading.Event() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # The heartbeat needs to be faster than what the client will wait for | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # The VLLM_RPC_TIMEOUT duration is in ms, and we need one in seconds | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.heartbeat_interval_seconds = VLLM_RPC_TIMEOUT / 5000.0 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._last_alive_time = time.time() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # The heartbeats can tolerate a long period of the engine chugging | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # away at a generation request. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # The VLLM_RPC_TIMEOUT duration is in ms, and we need one in seconds | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.last_alive_threshold = VLLM_RPC_TIMEOUT * 3.0 / 1000.0 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @property | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def dead_error(self) -> BaseException: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if self._errored_with is not None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -147,8 +131,6 @@ def start(self): | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.debug("Starting Startup Loop.") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.run_startup_loop() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.debug("Starting heartbeat thread") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.heartbeat_thread.start() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.debug("Starting Engine Loop.") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.run_engine_loop() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -162,7 +144,6 @@ def start(self): | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def cleanup(self): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Cleanup zeromq state on shutdown.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Closes all sockets and destroys context. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._heartbeat_stop_event.set() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.ctx.destroy(linger=0) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| del self.engine | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -201,11 +182,9 @@ def run_engine_loop(self): | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Core busy loop of the LLMEngine.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| while True: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._alive() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if not self.engine.has_unfinished_requests(): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Poll until there is work to do. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| while self.input_socket.poll(timeout=POLLING_TIMEOUT_MS) == 0: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._alive() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.engine.do_log_stats() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.debug("Waiting for new requests in engine loop.") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -221,6 +200,7 @@ def run_engine_loop(self): | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def engine_step(self) -> List[RequestOutput]: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Engine step wrapper with error handling.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return self.engine.step() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except SystemExit: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -249,9 +229,10 @@ def handle_new_input(self): | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._handle_process_request(request) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| elif isinstance(request, RPCAbortRequest): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._handle_abort_request(request) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| elif isinstance(request, RPCHealthRequest): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._handle_health_request() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| raise ValueError("Unknown RPCRequest Type: " | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| f"{type(request)}") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| raise ValueError("Unknown RPCRequest Type: {request}") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._set_errored(e) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -298,32 +279,13 @@ def _handle_abort_request(self, request: RPCAbortRequest): | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if self.log_requests: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.info("Aborted request %s.", request.request_id) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def _heartbeat_loop(self): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| while not self._heartbeat_stop_event.wait( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| timeout=self.heartbeat_interval_seconds): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Loops until the stop event is set | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._heartbeat() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.debug("Exiting MQLLMEngine heartbeat thread") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def _heartbeat(self): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Send unhealthy if engine has already errored | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def _handle_health_request(self): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if self._errored_with is not None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._send_unhealthy(self._errored_with) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Check for life of the main loop | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| elif time.time() - self._last_alive_time > self.last_alive_threshold: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._send_unhealthy(RuntimeError("Engine loop has died")) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Otherwise- check health of the engine | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # self.engine.check_health() raises on unhealthy | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.engine.check_health() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._send_healthy() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._set_errored(e) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._send_unhealthy(e) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Raises error if unhealthy. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.engine.check_health() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._send_healthy() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+282
to
+288
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing early return after sending unhealthy status. If Proposed fix def _handle_health_request(self):
if self._errored_with is not None:
self._send_unhealthy(self._errored_with)
+ return
# Raises error if unhealthy.
self.engine.check_health()
self._send_healthy()📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def _send_outputs(self, outputs: REQUEST_OUTPUTS_T): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Send List of RequestOutput to RPCClient.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -333,14 +295,12 @@ def _send_outputs(self, outputs: REQUEST_OUTPUTS_T): | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def _send_healthy(self): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Send HEALTHY message to RPCClient.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if not self.heartbeat_socket.closed: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.heartbeat_socket.send_multipart(HEALTHY_RESPONSE, copy=False) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.health_socket.send_multipart(HEALTHY_RESPONSE, copy=False) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def _send_unhealthy(self, error: BaseException): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Send UNHEALTHY message to RPCClient.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if not self.heartbeat_socket.closed: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| error_bytes = pickle.dumps(error) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.heartbeat_socket.send_multipart((error_bytes, ), copy=False) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| error_bytes = pickle.dumps(error) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.health_socket.send_multipart((error_bytes, ), copy=False) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def _async_socket_engine_callback(self, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| request_outputs: REQUEST_OUTPUTS_T): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -353,9 +313,6 @@ def _set_errored(self, e: BaseException): | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if self._errored_with is None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._errored_with = e | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def _alive(self): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._last_alive_time = time.time() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def run_mp_engine(engine_args: AsyncEngineArgs, usage_context: UsageContext, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ipc_path: str): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing f-string prefix causes incorrect error message.
Line 235 is missing the
fprefix, so{request}will be printed literally instead of the actual request value.Proposed fix
📝 Committable suggestion
🤖 Prompt for AI Agents