1717 maybe_register_config_serialize_by_value )
1818from vllm .utils import get_exception_traceback , zmq_socket_ctx
1919from vllm .v1 .core .scheduler import Scheduler
20- from vllm .v1 .engine import (EngineCoreOutput , EngineCoreOutputs ,
21- EngineCoreProfile , EngineCoreRequest ,
22- EngineCoreRequestType , EngineCoreRequestUnion )
20+ from vllm .v1 .engine import (EngineCoreOutputs , EngineCoreProfile ,
21+ EngineCoreRequest , EngineCoreRequestType ,
22+ EngineCoreRequestUnion )
2323from vllm .v1 .engine .mm_input_mapper import MMInputMapperServer
2424from vllm .v1 .executor .abstract import Executor
2525from vllm .v1 .request import Request , RequestStatus
2828
2929logger = init_logger (__name__ )
3030
31- POLLING_TIMEOUT_MS = 5000
32- POLLING_TIMEOUT_S = POLLING_TIMEOUT_MS // 1000
33- LOGGING_TIME_S = 5
31+ POLLING_TIMEOUT_S = 2.5
3432
3533
3634class EngineCore :
@@ -40,10 +38,8 @@ def __init__(
4038 self ,
4139 vllm_config : VllmConfig ,
4240 executor_class : Type [Executor ],
43- log_stats : bool = False ,
4441 ):
4542 assert vllm_config .model_config .runner_type != "pooling"
46- self .log_stats = log_stats
4743
4844 logger .info ("Initializing an LLM engine (v%s) with config: %s" ,
4945 VLLM_VERSION , vllm_config )
@@ -62,8 +58,6 @@ def __init__(
6258 vllm_config .cache_config ,
6359 vllm_config .lora_config )
6460
65- self ._last_logging_time = time .time ()
66-
6761 self .mm_input_mapper_server = MMInputMapperServer (
6862 vllm_config .model_config )
6963
@@ -114,11 +108,12 @@ def abort_requests(self, request_ids: List[str]):
114108 self .scheduler .finish_requests (request_ids ,
115109 RequestStatus .FINISHED_ABORTED )
116110
117- def step (self ) -> List [ EngineCoreOutput ] :
111+ def step (self ) -> EngineCoreOutputs :
118112 """Schedule, execute, and make output."""
119113
120114 if not self .scheduler .has_unfinished_requests ():
121- return []
115+ return EngineCoreOutputs (
116+ outputs = [], scheduler_stats = self .scheduler .make_stats ())
122117
123118 scheduler_output = self .scheduler .schedule ()
124119 output = self .model_executor .execute_model (scheduler_output )
@@ -145,15 +140,17 @@ def __init__(
145140 executor_class : Type [Executor ],
146141 log_stats : bool = False ,
147142 ):
148- super ().__init__ (vllm_config , executor_class , log_stats )
143+ super ().__init__ (vllm_config , executor_class )
144+
145+ self .log_stats = log_stats
149146
150147 # Background Threads and Queues for IO. These enable us to
151148 # overlap ZMQ socket IO with GPU since they release the GIL,
152149 # and to overlap some serialization/deserialization with the
153150 # model forward pass.
154151 # Threads handle Socket <-> Queues and core_busy_loop uses Queue.
155152 self .input_queue : queue .Queue [EngineCoreRequestUnion ] = queue .Queue ()
156- self .output_queue : queue .Queue [List [ EngineCoreOutput ] ] = queue .Queue ()
153+ self .output_queue : queue .Queue [EngineCoreOutputs ] = queue .Queue ()
157154 threading .Thread (target = self .process_input_socket ,
158155 args = (input_path , ),
159156 daemon = True ).start ()
@@ -217,8 +214,10 @@ def run_busy_loop(self):
217214 self ._handle_client_request (req )
218215 break
219216 except queue .Empty :
220- self ._log_stats ()
221217 logger .debug ("EngineCore busy loop waiting." )
218+ # Break out the loop so we can log_stats in step().
219+ if self .log_stats :
220+ break
222221 except BaseException :
223222 raise
224223
@@ -230,28 +229,9 @@ def run_busy_loop(self):
230229 # 3) Step the engine core.
231230 outputs = self .step ()
232231
233- # 4 ) Put EngineCoreOutputs into the output queue.
232+ # 5 ) Put EngineCoreOutputs into the output queue.
234233 self .output_queue .put_nowait (outputs )
235234
236- self ._log_stats ()
237-
238- def _log_stats (self ):
239- """Log basic stats every LOGGING_TIME_S"""
240-
241- if not self .log_stats :
242- return
243-
244- now = time .time ()
245-
246- if now - self ._last_logging_time > LOGGING_TIME_S :
247- logger .info (
248- "RUNNING: %s | WAITING: %s" ,
249- len (self .scheduler .running ),
250- len (self .scheduler .waiting ),
251- )
252-
253- self ._last_logging_time = now
254-
255235 def _handle_client_request (self , request : EngineCoreRequestUnion ) -> None :
256236 """Handle EngineCoreRequest or EngineCoreABORT from Client."""
257237
@@ -301,7 +281,6 @@ def process_output_socket(self, output_path: str):
301281
302282 with zmq_socket_ctx (output_path , zmq .constants .PUSH ) as socket :
303283 while True :
304- engine_core_outputs = self .output_queue .get ()
305- outputs = EngineCoreOutputs (outputs = engine_core_outputs )
284+ outputs = self .output_queue .get ()
306285 encoder .encode_into (outputs , buffer )
307286 socket .send_multipart ((buffer , ), copy = False )
0 commit comments