1818from vllm .config import VllmConfig
1919from vllm .logger import init_logger
2020from vllm .lora .request import LoRARequest
21- from vllm .utils import (get_open_zmq_ipc_path , kill_process_tree ,
22- make_zmq_socket )
21+ from vllm .utils import (get_open_zmq_inproc_path , get_open_zmq_ipc_path ,
22+ kill_process_tree , make_zmq_socket )
2323from vllm .v1 .engine import (EngineCoreOutputs , EngineCoreRequest ,
2424 EngineCoreRequestType , UtilityOutput )
2525from vllm .v1 .engine .core import EngineCore , EngineCoreProc
@@ -202,10 +202,11 @@ class BackgroundResources:
202202 """Used as a finalizer for clean shutdown, avoiding
203203 circular reference back to the client object."""
204204
205- ctx : Union [zmq .Context , zmq . asyncio . Context ] = None
205+ ctx : Union [zmq .Context ] = None
206206 output_socket : Union [zmq .Socket , zmq .asyncio .Socket ] = None
207207 input_socket : Union [zmq .Socket , zmq .asyncio .Socket ] = None
208208 proc_handle : Optional [BackgroundProcHandle ] = None
209+ shutdown_path : Optional [str ] = None
209210
210211 def __call__ (self ):
211212 """Clean up background resources."""
@@ -218,8 +219,13 @@ def __call__(self):
218219 self .output_socket .close (linger = 0 )
219220 if self .input_socket is not None :
220221 self .input_socket .close (linger = 0 )
221- if self .ctx is not None :
222- self .ctx .destroy (linger = 0 )
222+ if self .shutdown_path is not None :
223+ # We must ensure that the sync output socket is
224+ # closed cleanly in its own thread.
225+ with self .ctx .socket (zmq .PAIR ) as shutdown_sender :
226+ shutdown_sender .connect (self .shutdown_path )
227+ # Send shutdown signal.
228+ shutdown_sender .send (b'' )
223229
224230
225231class MPClient (EngineCoreClient ):
@@ -261,28 +267,23 @@ def sigusr1_handler(signum, frame):
261267 self .decoder = MsgpackDecoder (EngineCoreOutputs )
262268
263269 # ZMQ setup.
264- self .ctx = (
265- zmq .asyncio .Context () # type: ignore[attr-defined]
266- if asyncio_mode else zmq .Context ()) # type: ignore[attr-defined]
270+ sync_ctx = zmq .Context ()
271+ self .ctx = zmq .asyncio .Context (sync_ctx ) if asyncio_mode else sync_ctx
267272
268273 # This will ensure resources created so far are closed
269274 # when the client is garbage collected, even if an
270275 # exception is raised mid-construction.
271- resources = BackgroundResources (ctx = self . ctx )
272- self ._finalizer = weakref .finalize (self , resources )
276+ self . resources = BackgroundResources (ctx = sync_ctx )
277+ self ._finalizer = weakref .finalize (self , self . resources )
273278
274- # Paths and sockets for IPC.
275- output_path = get_open_zmq_ipc_path ()
279+ # Paths for IPC.
280+ self . output_path = get_open_zmq_ipc_path ()
276281 input_path = get_open_zmq_ipc_path ()
277- resources .output_socket = make_zmq_socket (self .ctx , output_path ,
278- zmq .constants .PULL )
279- resources .input_socket = make_zmq_socket (self .ctx , input_path ,
280- zmq .constants .PUSH )
281282
282283 # Start EngineCore in background process.
283- resources .proc_handle = BackgroundProcHandle (
284+ self . resources .proc_handle = BackgroundProcHandle (
284285 input_path = input_path ,
285- output_path = output_path ,
286+ output_path = self . output_path ,
286287 process_name = "EngineCore" ,
287288 target_fn = EngineCoreProc .run_engine_core ,
288289 process_kwargs = {
@@ -291,8 +292,10 @@ def sigusr1_handler(signum, frame):
291292 "log_stats" : log_stats ,
292293 })
293294
294- self .output_socket = resources .output_socket
295- self .input_socket = resources .input_socket
295+ # Create input socket.
296+ self .resources .input_socket = make_zmq_socket (self .ctx , input_path ,
297+ zmq .constants .PUSH )
298+ self .input_socket = self .resources .input_socket
296299 self .utility_results : dict [int , AnyFuture ] = {}
297300
298301 def shutdown (self ):
@@ -325,27 +328,48 @@ def __init__(self, vllm_config: VllmConfig, executor_class: type[Executor],
325328
326329 # Ensure that the outputs socket processing thread does not have
327330 # a ref to the client which prevents gc.
328- output_socket = self .output_socket
331+ ctx = self .ctx
332+ output_path = self .output_path
329333 decoder = self .decoder
330334 utility_results = self .utility_results
331335 outputs_queue = self .outputs_queue
332336
337+ shutdown_path = get_open_zmq_inproc_path ()
338+ self .resources .shutdown_path = shutdown_path
339+
333340 def process_outputs_socket ():
341+ shutdown_socket = ctx .socket (zmq .PAIR )
342+ shutdown_socket .bind (shutdown_path )
343+ out_socket = make_zmq_socket (ctx , output_path , zmq .constants .PULL )
334344 try :
345+ poller = zmq .Poller ()
346+ poller .register (shutdown_socket )
347+ poller .register (out_socket )
335348 while True :
336- (frame , ) = output_socket .recv_multipart (copy = False )
349+ socks = poller .poll ()
350+ if not socks :
351+ continue
352+ if len (socks ) == 2 or socks [0 ][0 ] == shutdown_socket :
353+ # shutdown signal, exit thread.
354+ break
355+
356+ (frame , ) = out_socket .recv_multipart (copy = False )
337357 outputs = decoder .decode (frame .buffer )
338358 if outputs .utility_output :
339359 _process_utility_output (outputs .utility_output ,
340360 utility_results )
341361 else :
342362 outputs_queue .put_nowait (outputs )
343- except zmq .error .ContextTerminated :
344- # Expected when the class is GC'd / during process termination.
345- pass
363+ finally :
364+ # Close sockets.
365+ shutdown_socket .close (linger = 0 )
366+ out_socket .close (linger = 0 )
346367
347368 # Process outputs from engine in separate thread.
348- Thread (target = process_outputs_socket , daemon = True ).start ()
369+ self .output_queue_thread = Thread (target = process_outputs_socket ,
370+ name = "EngineCoreOutputQueueThread" ,
371+ daemon = True )
372+ self .output_queue_thread .start ()
349373
350374 def get_output (self ) -> EngineCoreOutputs :
351375 return self .outputs_queue .get ()
@@ -424,10 +448,13 @@ async def _start_output_queue_task(self):
424448 # Perform IO in separate task to parallelize as much as possible.
425449 # Avoid task having direct reference back to the client.
426450 self .outputs_queue = asyncio .Queue ()
427- output_socket = self .output_socket
428451 decoder = self .decoder
429452 utility_results = self .utility_results
430453 outputs_queue = self .outputs_queue
454+ output_path = self .output_path
455+ output_socket = make_zmq_socket (self .ctx , output_path ,
456+ zmq .constants .PULL )
457+ self .resources .output_socket = output_socket
431458
432459 async def process_outputs_socket ():
433460 while True :
@@ -439,7 +466,8 @@ async def process_outputs_socket():
439466 else :
440467 outputs_queue .put_nowait (outputs )
441468
442- self .queue_task = asyncio .create_task (process_outputs_socket ())
469+ self .queue_task = asyncio .create_task (process_outputs_socket (),
470+ name = "EngineCoreOutputQueueTask" )
443471
444472 async def get_output_async (self ) -> EngineCoreOutputs :
445473 if self .outputs_queue is None :
0 commit comments