3636namespace mooncake {
3737namespace {
3838constexpr size_t kMemcpyBatchLimit = 4096 ;
39- }
39+ constexpr int32_t kMaxAdxlConnectRetries = 3 ;
40+ } // namespace
4041AscendDirectTransport::AscendDirectTransport () : running_(false ) {}
4142
4243AscendDirectTransport::~AscendDirectTransport () {
@@ -78,6 +79,7 @@ AscendDirectTransport::~AscendDirectTransport() {
7879 }
7980 }
8081 addr_to_mem_handle_.clear ();
82+ adxl_->Finalize ();
8183}
8284
8385int AscendDirectTransport::install (std::string &local_server_name,
@@ -547,8 +549,9 @@ void AscendDirectTransport::processSliceList(
547549 if (slice_list.empty ()) {
548550 return ;
549551 }
550- auto target_segment_desc =
551- metadata_->getSegmentDescByID (slice_list[0 ]->target_id );
552+ auto it = need_update_metadata_segs_.find (slice_list[0 ]->target_id );
553+ auto target_segment_desc = metadata_->getSegmentDescByID (
554+ slice_list[0 ]->target_id , (it != need_update_metadata_segs_.end ()));
552555 if (!target_segment_desc) {
553556 LOG (ERROR) << " Cannot find segment descriptor for target_id: "
554557 << slice_list[0 ]->target_id ;
@@ -557,6 +560,9 @@ void AscendDirectTransport::processSliceList(
557560 }
558561 return ;
559562 }
563+ if (it != need_update_metadata_segs_.end ()) {
564+ need_update_metadata_segs_.erase (it);
565+ }
560566 auto target_adxl_engine_name =
561567 (target_segment_desc->rank_info .hostIp + " :" +
562568 std::to_string (target_segment_desc->rank_info .hostPort ));
@@ -582,10 +588,14 @@ void AscendDirectTransport::processSliceList(
582588 << " us" ;
583589 return ;
584590 }
591+ return connectAndTransfer (target_adxl_engine_name, operation, slice_list);
592+ }
593+
594+ void AscendDirectTransport::connectAndTransfer (
595+ const std::string &target_adxl_engine_name, adxl::TransferOp operation,
596+ const std::vector<Slice *> &slice_list, int32_t times) {
585597 int ret = checkAndConnect (target_adxl_engine_name);
586598 if (ret != 0 ) {
587- LOG (ERROR) << " Failed to connect to segment: "
588- << target_segment_desc->name ;
589599 for (auto &slice : slice_list) {
590600 slice->markFailed ();
591601 }
@@ -613,6 +623,14 @@ void AscendDirectTransport::processSliceList(
613623 std::chrono::steady_clock::now () - start)
614624 .count ()
615625 << " us" ;
626+ } else if (status == adxl::NOT_CONNECTED) {
627+ LOG (INFO) << " Connection reset by backend, retry times:" << times;
628+ disconnect (target_adxl_engine_name, 0 , true );
629+ if (times < kMaxAdxlConnectRetries ) {
630+ return connectAndTransfer (target_adxl_engine_name, operation,
631+ slice_list, times + 1 );
632+ }
633+ return ;
616634 } else {
617635 if (status == adxl::TIMEOUT) {
618636 LOG (ERROR) << " Transfer timeout to: " << target_adxl_engine_name
@@ -628,6 +646,7 @@ void AscendDirectTransport::processSliceList(
628646 // the connection is probably broken.
629647 // set small timeout to just release local res.
630648 disconnect (target_adxl_engine_name, 10 );
649+ need_update_metadata_segs_.emplace (slice_list[0 ]->target_id );
631650 }
632651}
633652
@@ -844,21 +863,24 @@ int AscendDirectTransport::checkAndConnect(
844863}
845864
846865int AscendDirectTransport::disconnect (
847- const std::string &target_adxl_engine_name, int32_t timeout_in_millis) {
866+ const std::string &target_adxl_engine_name, int32_t timeout_in_millis,
867+ bool force) {
848868 std::lock_guard<std::mutex> lock (connection_mutex_);
849869 auto it = connected_segments_.find (target_adxl_engine_name);
850870 if (it == connected_segments_.end ()) {
851871 LOG (INFO) << " Target adxl engine: " << target_adxl_engine_name
852872 << " is not connected." ;
853873 return 0 ;
854874 }
855- auto status =
856- adxl_->Disconnect (target_adxl_engine_name.c_str (), timeout_in_millis);
857- if (status != adxl::SUCCESS) {
858- LOG (ERROR) << " Failed to disconnect to: " << target_adxl_engine_name
859- << " , status: " << status;
860- connected_segments_.erase (target_adxl_engine_name);
861- return -1 ;
875+ if (!force) {
876+ auto status = adxl_->Disconnect (target_adxl_engine_name.c_str (),
877+ timeout_in_millis);
878+ if (status != adxl::SUCCESS) {
879+ LOG (ERROR) << " Failed to disconnect to: " << target_adxl_engine_name
880+ << " , status: " << status;
881+ connected_segments_.erase (target_adxl_engine_name);
882+ return -1 ;
883+ }
862884 }
863885 connected_segments_.erase (target_adxl_engine_name);
864886 return 0 ;
0 commit comments