Skip to content

Conversation

@JasonZhang517
Copy link
Contributor

  • add a communicator including server and client pool
  • add pybind interface of the communicator
  • add test files

pybind11_add_module(engine ${SOURCES} ${CACHE_ALLOCATOR_SOURCES}
transfer_engine/transfer_engine_py.cpp
../mooncake-transfer-engine/src/transport/coro_rpc_connector/cororpc_interface.cpp
../mooncake-transfer-engine/src/transport/coro_rpc_connector/cororpc_communicator.cpp
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need to add these files here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we want to use coro_rpc_connector as a submodule of mooncake transfer engine, we have to compile it here

return false;
}

int CoroRPCCommunicator::sendData(const std::string& target_address,
Copy link

@poor-circle poor-circle Sep 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sendData should be a wrapper of sendDataAsync, dont rewrite same code twice

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use syncAwait(Lazy{}) as impl

auto future_ptr = std::make_shared<pybind11::object>(future_obj);
auto loop_ptr = std::make_shared<pybind11::object>(pybind11::reinterpret_borrow<pybind11::object>(loop));

auto task_func = std::make_shared<std::function<void()>>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

send_request or
auto lazy = [](int i) -> async_simple::coro::Lazy {
co_await c->send_data_async();
}

lazy(i).start({});


auto task_func = std::make_shared<std::function<void()>>(
[communicator, target_addr, data_holder, future_ptr, loop_ptr]() {
int result = communicator->sendData(*target_addr, data_holder->data(), data_holder->size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

co_await communicator->sendDataAsync

return future_obj;
}

int CoroRPCInterface::sendTensor(const std::string& target_address, pybind11::handle tensor) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according async_send_tensor

return future_obj;
}

void CoroRPCInterface::setDataReceiveCallback(pybind11::function callback) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use asyncio future

@stmatengss stmatengss requested a review from qicosmos September 4, 2025 03:45
@JasonZhang517 JasonZhang517 changed the title Coro rpc communicator [Transfer Engine]Coro rpc communicator Sep 4, 2025
@stmatengss
Copy link
Collaborator

@JasonZhang517 Do you happen to have any updates for this PR?

@JasonZhang517 JasonZhang517 force-pushed the coro_rpc_communicator branch 2 times, most recently from 77b6863 to abd15a1 Compare September 23, 2025 07:50
run: ${SCCACHE_PATH} --show-stats

- name: Install dependencies
shell: bash
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why move it here?

@stmatengss stmatengss requested review from Copilot and removed request for poor-circle November 4, 2025 05:52
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds support for CoRo RPC communication to the Mooncake transfer engine, enabling bandwidth testing between client and server using the yalantinglibs coro_rpc framework. The implementation provides both synchronous and asynchronous APIs for data and tensor transfer with Python bindings.

Key changes:

  • Added CoroRPCInterface and CoroRPCCommunicator classes for RPC communication
  • Integrated pybind11 for Python bindings with improved CMake configuration
  • Created a bandwidth test tool to measure communication performance

Reviewed Changes

Copilot reviewed 11 out of 12 changed files in this pull request and generated 17 comments.

Show a summary per file
File Description
mooncake-transfer-engine/tests/communicator_bandwidth_test.py New bandwidth test script for measuring RPC communication throughput
mooncake-transfer-engine/src/transport/coro_rpc_connector/cororpc_interface.cpp Core RPC interface implementation with data/tensor transfer methods
mooncake-transfer-engine/src/transport/coro_rpc_connector/cororpc_communicator.cpp RPC communicator implementation handling client pools and server operations
mooncake-transfer-engine/src/transport/coro_rpc_connector/CMakeLists.txt Build configuration for coro_rpc_connector module
mooncake-transfer-engine/src/transport/CMakeLists.txt Updated to include coro_rpc_connector subdirectory
mooncake-transfer-engine/src/CMakeLists.txt Added pybind11 configuration and Python dependencies
mooncake-transfer-engine/include/transport/coro_rpc_connector/cororpc_interface.h Header defining CoroRPCInterface API
mooncake-transfer-engine/include/transport/coro_rpc_connector/cororpc_communicator.h Header defining CoroRPCCommunicator API
mooncake-integration/transfer_engine/transfer_engine_py.cpp Python bindings for CoroRPCInterface
mooncake-integration/CMakeLists.txt Updated to link coro_rpc_connector
CMakeLists.txt Enhanced pybind11 discovery logic
.gitignore Whitespace fix

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants