Skip to content

Conversation

@WoosukKwon
Copy link
Collaborator

@WoosukKwon WoosukKwon commented Aug 22, 2025

Closes #23233

This PR restructures the core loop for both synchronous and asynchronous scheduling.

For sync scheduling, this allows overlapping bitmask construction with the model execution.
For async scheduling, this allows overlapping input preparation (and bitmask) with the model execution. Currently, only deserialization of scheduler outputs is overlapped. This could be expanded in future PRs.
Besides, this PR refactors the async scheduling loop to be easier to understand, and enables structured outputs support.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request restructures the core scheduling loop to better support asynchronous operations, which is a significant and positive change. My review focuses on ensuring that this refactoring doesn't introduce regressions. I've identified a few issues: a critical bug where the RayDistributedExecutor is not updated to the new interface, and a couple of high-severity issues related to missing error handling and a missing import. Addressing these will help ensure the stability and correctness of the new implementation.

Comment on lines 82 to 90
def prepare_inputs(self, scheduler_output) -> None:
self.collective_rpc("prepare_inputs", args=(scheduler_output, ))

def execute_model(self) -> None:
self.collective_rpc("execute_model")

def sample(self, grammar_bitmask) -> ModelRunnerOutput:
output = self.collective_rpc("sample", args=(grammar_bitmask, ))
return output[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The RayDistributedExecutor in vllm/v1/executor/ray_distributed_executor.py has not been updated to implement the new abstract methods prepare_inputs, execute_model, and sample. It still implements the old execute_model method. This will lead to a TypeError at runtime when using the Ray executor, as it won't be a valid subclass of the Executor abstract base class. Please update RayDistributedExecutor to align with this new interface.

Comment on lines 289 to 294
self.model_executor.prepare_inputs(scheduler_output)
self.model_executor.execute_model()
bitmask = self.scheduler.get_grammar_bitmask(scheduler_output)
model_output = self.model_executor.sample(bitmask)
engine_core_outputs = self.scheduler.update_from_output(
scheduler_output, model_output) # type: ignore

scheduler_output, model_output)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The call to execute_model_with_error_logging has been removed in this refactoring of the step method, and it's also missing from the new step_async method. This is a potential regression as model execution errors will no longer be caught and logged with detailed context, which can make debugging difficult. It's recommended to reintroduce the error handling in both methods. For step, you could wrap the new multi-step execution logic in a helper function and pass it to execute_model_with_error_logging.

Suggested change
self.model_executor.prepare_inputs(scheduler_output)
self.model_executor.execute_model()
bitmask = self.scheduler.get_grammar_bitmask(scheduler_output)
model_output = self.model_executor.sample(bitmask)
engine_core_outputs = self.scheduler.update_from_output(
scheduler_output, model_output) # type: ignore
scheduler_output, model_output)
def _model_step(scheduler_output: "SchedulerOutput"):
self.model_executor.prepare_inputs(scheduler_output)
self.model_executor.execute_model()
bitmask = self.scheduler.get_grammar_bitmask(scheduler_output)
return self.model_executor.sample(bitmask)
model_output = self.execute_model_with_error_logging(
_model_step, scheduler_output)
engine_core_outputs = self.scheduler.update_from_output(
scheduler_output, model_output)

Signed-off-by: Woosuk Kwon <[email protected]>
Signed-off-by: Woosuk Kwon <[email protected]>
Signed-off-by: Woosuk Kwon <[email protected]>
) -> Union[ModelRunnerOutput, Future[ModelRunnerOutput]]:
output = self.collective_rpc("execute_model",
args=(scheduler_output, ))
del non_block
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's this for?

@mergify
Copy link

mergify bot commented Aug 25, 2025

This pull request has merge conflicts that must be resolved before it can be
merged. Please rebase the PR, @WoosukKwon.

https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/working-with-forks/syncing-a-fork

@mergify mergify bot added the needs-rebase label Aug 25, 2025
@WoosukKwon WoosukKwon marked this pull request as draft August 25, 2025 18:24
@Ronald1995
Copy link
Contributor

Ronald1995 commented Aug 28, 2025

I don't understand how you implement the overlap of prepare_input , it seems like the prepare_input, execute_model, sample task are executed in order in worker process.

#23811 my solution is to use two threads in worker process, and overlap the prepare_input with the d2h copy operations! would you please explain why your solution will overlap the prepare_input. thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[RFC]: Restructure the core loop to allow more asynchrony

4 participants