Skip to content

Conversation

@simon-mo
Copy link
Collaborator

Summary

  • replace the v1 Ray executor with a standalone implementation derived from the former v0 logic
  • drop the legacy Ray executor path from the v0 engine and guard against selecting it
  • refresh tooling references to the removed module
  • ensure the Ray executor returns real Future instances by resolving compiled DAG outputs in a background thread so scheduler queues and KV aggregation observe completion correctly

Testing

  • python -m compileall vllm/v1/executor/ray_distributed_executor.py
  • python -m compileall vllm/engine/llm_engine.py
  • bash tools/mypy.sh (fails: pre-existing typing errors and missing third-party stubs)

https://chatgpt.com/codex/tasks/task_e_68cc992ce0348329bd2b32d08b588b9c

@simon-mo
Copy link
Collaborator Author

@ruisearch42 PTAL

@mergify mergify bot added the v1 label Sep 19, 2025
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request effectively removes the v0 Ray executor and consolidates its logic into a standalone v1 implementation. The change to use a non-blocking FutureWrapper for asynchronous operations is a significant improvement for performance and responsiveness. My main feedback is that the migration has introduced a substantial amount of dead code from the v0 executor's non-SPMD mode, which is not supported in the v1 executor. Removing this dead code would greatly improve the maintainability and clarity of the new RayDistributedExecutor.

Comment on lines 159 to 161
if not self.use_ray_compiled_dag:
self.driver_exec_method = make_async(
self.driver_worker.execute_method)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Since VLLM_USE_RAY_COMPILED_DAG is set to '1' at the beginning of _init_executor, self.use_ray_compiled_dag will always be true. This conditional block is therefore unreachable and can be removed.

Comment on lines 180 to 183
if not self.use_ray_spmd_worker:
raise RuntimeError(
"RayDistributedExecutor in v1 requires "
"VLLM_USE_RAY_SPMD_WORKER=1")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The self.use_ray_spmd_worker flag is always true for the v1 Ray executor, as enforced in _init_executor. This conditional check is redundant, and the RuntimeError is unreachable. This block can be safely removed.

Comment on lines 326 to 347
if not self.use_ray_spmd_worker:
for i, each in enumerate(worker_metadata):
# find and remove the dummy worker from the list
worker = each.worker
worker_ip = each.ip
if self.driver_dummy_worker is None and worker_ip == driver_ip:
# If the worker is on the same node as the driver, we use it
# as the resource holder for the driver process.
self.driver_dummy_worker = worker
self.driver_worker = RayWorkerWrapper(
vllm_config=self.vllm_config, rpc_rank=0)
worker_metadata.pop(i)
break

logger.debug("workers: %s", worker_metadata)
logger.debug("driver_dummy_worker: %s", self.driver_dummy_worker)
if not self.use_ray_spmd_worker and self.driver_dummy_worker is None:
raise ValueError(
"Ray does not allocate any GPUs on the driver node."
f"Driver IP: {driver_ip}, worker IPs: {worker_ips}."
"Consider adjusting the Ray placement group or running "
"the driver on a GPU node.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The logic within this if not self.use_ray_spmd_worker: block, and the subsequent check for self.driver_dummy_worker is None, is designed for a non-SPMD setup. Since the v1 Ray executor exclusively uses SPMD workers, this code is unreachable and should be removed to improve code clarity.

# node will be placed first.
sorted_worker_metadata = sorted(worker_metadata,
key=sort_by_driver_then_worker_ip)
start_rank = 0 if self.use_ray_spmd_worker else 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Since self.use_ray_spmd_worker is always true, this line can be simplified.

Suggested change
start_rank = 0 if self.use_ray_spmd_worker else 1
start_rank = 0

Comment on lines 510 to 518
def _driver_execute_model(
self, execute_model_req: Optional[ExecuteModelRequest]
) -> Optional[List[SamplerOutput]]:
"""Run execute_model in the driver worker."""

assert not self.use_ray_spmd_worker, (
"driver_worker does not exist for VLLM_USE_RAY_SPMD_WORKER=1")
return self.driver_worker.execute_method("execute_model",
execute_model_req)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This method is guarded by an assertion assert not self.use_ray_spmd_worker, which will always fail because the v1 Ray executor requires SPMD mode. Therefore, this method is unreachable and can be removed.

Comment on lines 561 to 565
if not self.use_ray_spmd_worker:
# Start the driver worker after all the ray workers.
driver_worker_output = [
self.driver_worker.execute_method(sent_method, *args, **kwargs)
]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This conditional block handles the non-SPMD case, which is not supported by the v1 Ray executor. It is dead code and can be removed.

Comment on lines 673 to 704
async def _driver_execute_model_async(
self,
execute_model_req: Optional[ExecuteModelRequest] = None
) -> List[SamplerOutput]:
assert not self.use_ray_spmd_worker, (
"driver_worker does not exist for VLLM_USE_RAY_SPMD_WORKER=1")
if not self.tp_driver_workers:
return await self.driver_exec_method("execute_model",
execute_model_req)
if self.pp_locks is None:
self.pp_locks = [
asyncio.Lock()
for _ in range(self.parallel_config.pipeline_parallel_size)
]

tasks = [
asyncio.create_task(
_run_task_with_lock(self.driver_exec_method, self.pp_locks[0],
"execute_model", execute_model_req))
]
for pp_rank, driver_worker in enumerate(self.tp_driver_workers,
start=1):
tasks.append(
asyncio.create_task(
_run_task_with_lock(driver_worker.execute_method.remote,
self.pp_locks[pp_rank],
"execute_model", execute_model_req)))

results = await asyncio.gather(*tasks)

# Only the last PP stage has the final results.
return results[-1]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This asynchronous method for the driver is for the non-SPMD case and is guarded by an assertion that will always fail. It is unreachable and should be removed.

Comment on lines 706 to 713
async def _start_worker_execution_loop(self):
assert not self.use_ray_spmd_worker, (
"worker loop is disabled for VLLM_USE_RAY_SPMD_WORKER=1")
coros = [
worker.execute_method.remote("start_worker_execution_loop")
for worker in self.non_driver_workers
]
return await asyncio.gather(*coros)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This method to start the worker execution loop is for the non-SPMD case and is unreachable in the v1 executor. It should be removed.

@simon-mo
Copy link
Collaborator Author

@codex fix comments

@chatgpt-codex-connector
Copy link

Summary

Testing

  • python -m compileall vllm/v1/executor/ray_distributed_executor.py

View task →

@mergify
Copy link

mergify bot commented Sep 21, 2025

This pull request has merge conflicts that must be resolved before it can be
merged. Please rebase the PR, @simon-mo.

https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/working-with-forks/syncing-a-fork

@mergify mergify bot added the needs-rebase label Sep 21, 2025
@njhill
Copy link
Member

njhill commented Oct 24, 2025

Ray V0 executor has now been removed in #27142, but that didn't include the FutureWrapper improvements in this PR which I assume would still be valuable.

  • ensure the Ray executor returns real Future instances by resolving compiled DAG outputs in a background thread so scheduler queues and KV aggregation observe completion correctly

@ruisearch42
Copy link
Collaborator

Sorry I think I missed Simon's earlier ping. The FutureWrapper part LGTM.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants