Skip to content

Conversation

@Ronald1995
Copy link
Contributor

@Ronald1995 Ronald1995 commented Aug 28, 2025

Purpose

in execute_model, it has update_states and prepare_inputs opearations, this PR aims to use multithread to overlap the update_states and prepare_input into gpu to cpu copy operations of sample_token_ids.

Test Plan

Test Result


Essential Elements of an Effective PR Description Checklist
  • The purpose of the PR, such as "Fix some issue (link existing issues this PR will resolve)".
  • The test plan, such as providing test command.
  • The test results, such as pasting the results comparison before and after, or e2e results
  • (Optional) The necessary documentation update, such as updating supported_models.md and examples for a new model.
  • (Optional) Release notes update. If your change is user facing, please update the release notes draft in the Google Doc.

@mergify
Copy link

mergify bot commented Aug 28, 2025

This pull request has merge conflicts that must be resolved before it can be
merged. Please rebase the PR, @Ronald1995.

https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/working-with-forks/syncing-a-fork

@mergify mergify bot added the needs-rebase label Aug 28, 2025
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces an experimental feature for asynchronous model execution to improve performance by overlapping CPU and GPU work. The changes involve adding a new configuration flag async_execute_model, modifying the scheduling and execution logic to handle asynchronous operations, and introducing threading with events for synchronization. While the overall direction is promising for performance, I've identified two critical issues in the implementation that could lead to deadlocks or livelocks. One is in the engine's core stepping logic, and the other is in the multiprocessing executor's handling of the asynchronous execution pipeline. These issues need to be addressed for the feature to function correctly.

Comment on lines +348 to +350
if (self.async_execute_model
and scheduler_output.total_num_scheduled_tokens == 0):
return engine_core_outputs, scheduled_batch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This new conditional block introduces two critical issues:

  1. AttributeError Bug: scheduler_output can be None when this block is reached. This occurs if self.batch_queue is full, because scheduler.schedule() is not called, and scheduler_output remains None. Accessing scheduler_output.total_num_scheduled_tokens will then raise an AttributeError.

  2. Potential Livelock: Even if the AttributeError is fixed (e.g., by checking scheduler_output is not None), a logical flaw remains. If this condition is met, the function returns without processing items from self.batch_queue. Since the state that led to this condition might not change, subsequent calls to step_with_batch_queue could repeatedly hit the same condition, causing items in the queue to be starved and leading to a livelock.

The logic for when to process items from the queue versus returning early needs to be reconsidered to avoid these problems.

Comment on lines +639 to +648
def execute_model_with_queue(self, func, *args, **kwargs):
"""Execute model with a queue for async execution."""
output = None
if not self.exe_queue.full():
output_future = self.exe_thread_pool.submit(func, *args, **kwargs)
self.exe_queue.put_nowait(output_future)
if self.exe_queue.full():
output = self.exe_queue.get().result()
self.exe_queue.task_done()
return output
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The current implementation of execute_model_with_queue will lead to a deadlock. Here's why:

  1. On the first call to execute_model_with_queue, self.exe_queue is empty, so it's not full. A future is submitted and added to the queue. The function then returns None.
  2. In worker_busy_loop, because the output from execute_model_with_queue is None, the loop continues to the next iteration without sending a response back to the main process via self.worker_response_mq.enqueue() (due to the if not output: continue check).
  3. The MultiprocExecutor in the main process, which made the collective_rpc call, will hang indefinitely waiting for a response that will never arrive.

To prevent this deadlock, execute_model_with_queue must ensure that a response is sent for every execute_model RPC call. The pipelining logic needs to be revised to guarantee a reply, even for the first call that primes the pipeline.

@Ronald1995 Ronald1995 changed the title [[WIP] implement overlap of prepare_input during execute_model [WIP] implement overlap of prepare_input during execute_model Aug 28, 2025
@njhill
Copy link
Member

njhill commented Aug 28, 2025

@Ronald1995 have you looked at #23569

@Ronald1995
Copy link
Contributor Author

@Ronald1995 have you looked at #23569

I haven't looked at it before you mention this, but read the code of #23569 today, it seems like we are doing the same thing.

@njhill
Copy link
Member

njhill commented Aug 31, 2025

Thanks @Ronald1995... It's good that you had the same idea, we appreciate you submitting the contribution!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants