Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions mooncake-common/common.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ option(USE_TCP "option for using TCP transport" ON)
option(USE_ASCEND "option for using npu with HCCL" OFF)
option(USE_ASCEND_DIRECT "option for using ascend npu with adxl engine" OFF)
option(USE_ASCEND_HETEROGENEOUS "option for transferring between ascend npu and gpu" OFF)
option(USE_ASCEND_HETEROGENEOUS_TCP "Option to use TCP for transmission between Ascend NPU and GPU" OFF)
option(USE_MNNVL "option for using Multi-Node NVLink transport" OFF)
option(USE_CXL "option for using CXL protocol" OFF)
option(USE_ETCD "option for enable etcd as metadata server" OFF)
Expand Down Expand Up @@ -175,11 +176,16 @@ if (USE_ASCEND_DIRECT)
add_compile_definitions(USE_ASCEND_DIRECT)
endif()

if (USE_ASCEND_HETEROGENEOUS)
if (USE_ASCEND_HETEROGENEOUS OR USE_ASCEND_HETEROGENEOUS_TCP)
file(GLOB ASCEND_TOOLKIT_ROOT "/usr/local/Ascend/ascend-toolkit/latest/*-linux")
set(ASCEND_LIB_DIR "${ASCEND_TOOLKIT_ROOT}/lib64")
set(ASCEND_INCLUDE_DIR "${ASCEND_TOOLKIT_ROOT}/include")
add_compile_definitions(USE_ASCEND_HETEROGENEOUS)
if (USE_ASCEND_HETEROGENEOUS)
add_compile_definitions(USE_ASCEND_HETEROGENEOUS)
endif()
if (USE_ASCEND_HETEROGENEOUS_TCP)
add_compile_definitions(USE_ASCEND_HETEROGENEOUS_TCP)
endif()
include_directories(/usr/local/include /usr/include ${ASCEND_INCLUDE_DIR})
link_directories(${ASCEND_LIB_DIR})
endif()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ int TransferEnginePy::initializeExt(const char *local_hostname,

free_list_.resize(kSlabSizeKBTabLen);
#if !defined(USE_ASCEND) && !defined(USE_ASCEND_DIRECT) && \
!defined(USE_ASCEND_HETEROGENEOUS)
!defined(USE_ASCEND_HETEROGENEOUS) && !defined(USE_ASCEND_HETEROGENEOUS_TCP)
doBuddyAllocate(kMaxClassId);
#endif
return 0;
Expand Down
2 changes: 1 addition & 1 deletion mooncake-transfer-engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ if (USE_ASCEND)
)
endif()

if (USE_ASCEND_HETEROGENEOUS)
if (USE_ASCEND_HETEROGENEOUS OR USE_ASCEND_HETEROGENEOUS_TCP)
file(GLOB ASCEND_TOOLKIT_ROOT "/usr/local/Ascend/ascend-toolkit/latest/*-linux")
set(ASCEND_INCLUDE_DIR "${ASCEND_TOOLKIT_ROOT}/include")
include_directories(/usr/local/include
Expand Down
5 changes: 5 additions & 0 deletions mooncake-transfer-engine/example/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ if (USE_ASCEND_DIRECT)
target_link_libraries(transfer_engine_ascend_direct_perf PUBLIC transfer_engine)
endif()

if (USE_ASCEND_HETEROGENEOUS OR USE_ASCEND_HETEROGENEOUS_TCP)
add_executable(transfer_engine_heterogeneous_ascend_perf_initiator transfer_engine_heterogeneous_ascend_perf_initiator.cpp)
target_link_libraries(transfer_engine_heterogeneous_ascend_perf_initiator PUBLIC transfer_engine)
endif()

if (USE_ASCEND_HETEROGENEOUS)
add_executable(transfer_engine_heterogeneous_ascend_perf_initiator transfer_engine_heterogeneous_ascend_perf_initiator.cpp)
target_link_libraries(transfer_engine_heterogeneous_ascend_perf_initiator PUBLIC transfer_engine)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 2025 Huawei Technologies Co., Ltd
// Copyright 2024 KVCache.AI
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef HETEROGENEOUS_TCP_TRANSPORT_H_
#define HETEROGENEOUS_TCP_TRANSPORT_H_

#include "transport/tcp_transport/tcp_transport.h"
#include "acl/acl.h"
#include <atomic>
#include <new>
#include <condition_variable>

#define HUGE_HOST_SIZE (3ULL * 1024 * 1024 * 1024)
#define HUGE_DEVICE_SIZE (8 * 1024 * 1024)
#define HUGE_DEVICE_NUM 4

namespace mooncake {

class HeterogeneousTcpTransport : public Transport {
public:
HeterogeneousTcpTransport();

~HeterogeneousTcpTransport();

int install(std::string &local_server_name,
std::shared_ptr<TransferMetadata> meta,
std::shared_ptr<Topology> topo) override;

const char *getName() const override { return "ascend"; }

int registerLocalMemory(void *addr, size_t length,
const std::string &location, bool remote_accessible,
bool update_metadata) override;

int unregisterLocalMemory(void *addr, bool update_metadata = true) override;

int registerLocalMemoryBatch(const std::vector<BufferEntry> &buffer_list,
const std::string &location) override;

int unregisterLocalMemoryBatch(
const std::vector<void *> &addr_list) override;

int createStream();

Status submitTransfer(BatchID batch_id,
const std::vector<TransferRequest> &entries) override;

Status submitTransferTask(
const std::vector<TransferTask *> &task_list) override;

Status getTransferStatus(BatchID batch_id, size_t task_id,
TransferStatus &status) override;
std::unique_ptr<TcpTransport> transport_{};

private:
void transferLoop();

private:
struct TransferTaskTCP {
std::vector<TransferTask *> tasks;
uint64_t total_length;
uint64_t devId;

TransferTaskTCP(TransferTaskTCP &&) = default;
TransferTaskTCP &operator=(TransferTaskTCP &&) = default;

TransferTaskTCP(const TransferTaskTCP &) = delete;
TransferTaskTCP &operator=(const TransferTaskTCP &) = delete;

TransferTaskTCP(std::vector<TransferTask *> taskList, uint64_t len,
uint64_t id)
: tasks(std::move(taskList)), total_length(len), devId(id) {}
};
bool running_ = false;
aclrtStream stream_;
void *hostAddr_ = nullptr;
void *devAddr_ = nullptr;
std::vector<void *> hugeDevAddrs;
int deviceLogicId_;
bool firstSubmit_ = true;
std::mutex memcpy_mutex_;
uint64_t offset_ = 0;
std::thread transferThread_;
std::queue<TransferTaskTCP> transferQueues_;
std::mutex transfer_mutex_;
std::condition_variable transfer_cond_;
std::atomic<int> transfer_counter_{0};
int devId_ = 0;
std::array<bool, HUGE_DEVICE_NUM> mem_blocks = {false, false, false, false};
std::mutex dev_mtx_;
std::condition_variable dev_cv_;
};

using TransferRequest = Transport::TransferRequest;
using TransferStatus = Transport::TransferStatus;
using TransferStatusEnum = Transport::TransferStatusEnum;
using SegmentID = Transport::SegmentID;
using BatchID = Transport::BatchID;

} // namespace mooncake

#endif // HETEROGENEOUS_TCP_TRANSPORT_H_
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ class TcpTransport : public Transport {
Status getTransferStatus(BatchID batch_id, size_t task_id,
TransferStatus &status) override;

private:
int install(std::string &local_server_name,
std::shared_ptr<TransferMetadata> meta,
std::shared_ptr<Topology> topo);
Expand Down
3 changes: 1 addition & 2 deletions mooncake-transfer-engine/include/transport/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,7 @@ class Transport {
#endif

// record the origin request
#ifdef USE_ASCEND_HETEROGENEOUS
// need to modify the request's source address, changing it from an NPU
#if defined(USE_ASCEND_HETEROGENEOUS) || defined(USE_ASCEND_HETEROGENEOUS_TCP) // need to modify the request's source address, changing it from an NPU
// address to a CPU address.
TransferRequest *request = nullptr;
#else
Expand Down
2 changes: 1 addition & 1 deletion mooncake-transfer-engine/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ if (USE_ASCEND_DIRECT)
endif()
endif()

if (USE_ASCEND_HETEROGENEOUS)
if (USE_ASCEND_HETEROGENEOUS OR USE_ASCEND_HETEROGENEOUS_TCP)
file(GLOB ASCEND_TOOLKIT_ROOT "/usr/local/Ascend/ascend-toolkit/latest/*-linux")
set(ASCEND_LIB_DIR "${ASCEND_TOOLKIT_ROOT}/lib64")
link_directories(${ASCEND_LIB_DIR})
Expand Down
14 changes: 11 additions & 3 deletions mooncake-transfer-engine/src/multi_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
#ifdef USE_ASCEND_HETEROGENEOUS
#include "transport/ascend_transport/heterogeneous_rdma_transport.h"
#endif
#ifdef USE_ASCEND_HETEROGENEOUS_TCP
#include "transport/ascend_transport/heterogeneous_tcp_transport.h"
#endif
#ifdef USE_MNNVL
#include "transport/nvlink_transport/nvlink_transport.h"
#endif
Expand Down Expand Up @@ -101,7 +104,7 @@ Status MultiTransport::submitTransfer(
assert(transport);
auto &task = batch_desc.task_list[task_id];
task.batch_id = batch_id;
#ifdef USE_ASCEND_HETEROGENEOUS
#if defined(USE_ASCEND_HETEROGENEOUS) || defined(USE_ASCEND_HETEROGENEOUS_TCP)
task.request = const_cast<Transport::TransferRequest *>(&request);
#else
task.request = &request;
Expand Down Expand Up @@ -227,6 +230,11 @@ Transport *MultiTransport::installTransport(const std::string &proto,
transport = new HeterogeneousRdmaTransport();
}
#endif
#ifdef USE_ASCEND_HETEROGENEOUS_TCP
else if (std::string(proto) == "ascend") {
transport = new HeterogeneousTcpTransport();
}
#endif
Comment on lines +233 to +237
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This else if block is conditional on USE_ASCEND_HETEROGENEOUS_TCP, but it's syntactically attached to the preceding if-else if chain. If USE_ASCEND_HETEROGENEOUS is disabled, this else if will be dangling and cause a compilation error. The entire block for selecting an ascend transport should be restructured to handle the mutually exclusive options correctly, for example by using a single if-else if chain with #if/#elif/#endif preprocessor directives to ensure correctness regardless of which flags are enabled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This else if block is conditional on USE_ASCEND_HETEROGENEOUS_TCP, but it's syntactically attached to the preceding if-else if chain. If USE_ASCEND_HETEROGENEOUS is disabled, this else if will be dangling and cause a compilation error. The entire block for selecting an ascend transport should be restructured to handle the mutually exclusive options correctly, for example by using a single if-else if chain with #if/#elif/#endif preprocessor directives to ensure correctness regardless of which flags are enabled.

fixed

#ifdef USE_MNNVL
else if (std::string(proto) == "nvlink") {
transport = new NvlinkTransport();
Expand Down Expand Up @@ -260,11 +268,11 @@ Status MultiTransport::selectTransport(const TransferRequest &entry,
std::to_string(entry.target_id));
}
auto proto = target_segment_desc->protocol;
#ifdef USE_ASCEND_HETEROGENEOUS
#if defined(USE_ASCEND_HETEROGENEOUS) || defined(USE_ASCEND_HETEROGENEOUS_TCP)
// When USE_ASCEND_HETEROGENEOUS is enabled:
// - Target side directly reuses RDMA Transport
// - Initiator side uses heterogeneous_rdma_transport
if (target_segment_desc->protocol == "rdma") {
if (target_segment_desc->protocol == "rdma" || target_segment_desc->protocol == "tcp") {
proto = "ascend";
}
#endif
Expand Down
4 changes: 2 additions & 2 deletions mooncake-transfer-engine/src/transfer_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ int TransferEngine::init(const std::string &metadata_conn_string,
#else

#if defined(USE_CXL) && !defined(USE_ASCEND) && \
!defined(USE_ASCEND_HETEROGENEOUS)
!defined(USE_ASCEND_HETEROGENEOUS) && !defined(USE_ASCEND_HETEROGENEOUS_TCP)
if (std::getenv("MC_CXL_DEV_PATH") != nullptr) {
Transport *cxl_transport =
multi_transports_->installTransport("cxl", local_topology_);
Expand Down Expand Up @@ -200,7 +200,7 @@ int TransferEngine::init(const std::string &metadata_conn_string,
LOG(INFO) << "Topology discovery complete. Found "
<< local_topology_->getHcaList().size() << " HCAs.";

#ifdef USE_ASCEND_HETEROGENEOUS
#if defined(USE_ASCEND_HETEROGENEOUS) || defined(USE_ASCEND_HETEROGENEOUS_TCP)
Transport *ascend_transport =
multi_transports_->installTransport("ascend", local_topology_);
if (!ascend_transport) {
Expand Down
2 changes: 1 addition & 1 deletion mooncake-transfer-engine/src/transport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ if (USE_ASCEND OR USE_ASCEND_DIRECT)
target_sources(transport PUBLIC $<TARGET_OBJECTS:ascend_transport>)
endif()

if (USE_ASCEND_HETEROGENEOUS)
if (USE_ASCEND_HETEROGENEOUS OR USE_ASCEND_HETEROGENEOUS_TCP)
add_subdirectory(ascend_transport)
target_sources(transport PUBLIC $<TARGET_OBJECTS:ascend_transport>)
endif()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ if (USE_ASCEND)
add_subdirectory(hccl_transport/ascend_transport_c)
elseif (USE_ASCEND_HETEROGENEOUS)
file(GLOB ASCEND_SOURCES "heterogeneous_rdma_transport/*.cpp")
elseif (USE_ASCEND_HETEROGENEOUS_TCP)
file(GLOB ASCEND_SOURCES "heterogeneous_tcp_transport/*.cpp")
else ()
file(GLOB ASCEND_SOURCES "ascend_direct_transport/*.cpp")
endif ()
Expand Down
Loading
Loading