Skip to content

Commit 84cc1fa

Browse files
committed
guard against cancellation in FutureWrapper
Signed-off-by: Nick Hill <[email protected]>
1 parent 45c6b36 commit 84cc1fa

File tree

1 file changed

+9
-6
lines changed

1 file changed

+9
-6
lines changed

vllm/v1/executor/multiproc_executor.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
import weakref
1212
from collections import deque
1313
from collections.abc import Callable
14-
from concurrent.futures import Future
14+
from concurrent.futures import Future, InvalidStateError
15+
from contextlib import suppress
1516
from dataclasses import dataclass
1617
from enum import Enum, auto
1718
from functools import cached_property, partial
@@ -66,15 +67,17 @@ def result(self, timeout=None):
6667
# Drain any futures ahead of us in the queue.
6768
while not self.done():
6869
future, get_response = self.futures_queue.pop()
69-
future.update_with_response(get_response)
70+
future.wait_for_response(get_response)
7071
return super().result()
7172

72-
def update_with_response(self, get_response: Callable):
73+
def wait_for_response(self, get_response: Callable):
7374
try:
7475
response = get_response()
75-
self.set_result(response)
76+
with suppress(InvalidStateError):
77+
self.set_result(response)
7678
except Exception as e:
77-
self.set_exception(e)
79+
with suppress(InvalidStateError):
80+
self.set_exception(e)
7881

7982

8083
class MultiprocExecutor(Executor):
@@ -306,7 +309,7 @@ def get_response():
306309
# First drain any pending futures in the queue.
307310
while self.futures_queue:
308311
future, get_fut_response = self.futures_queue.pop()
309-
future.update_with_response(get_fut_response)
312+
future.wait_for_response(get_fut_response)
310313

311314
return get_response()
312315

0 commit comments

Comments
 (0)