diff --git a/include/ur_client_library/comm/pipeline.h b/include/ur_client_library/comm/pipeline.h index f18f17410..aafd0f754 100644 --- a/include/ur_client_library/comm/pipeline.h +++ b/include/ur_client_library/comm/pipeline.h @@ -366,13 +366,9 @@ class Pipeline */ void stop() { - if (!running_) - return; - URCL_LOG_DEBUG("Stopping pipeline! <%s>", name_.c_str()); running_ = false; - producer_.stopProducer(); if (pThread_.joinable()) { diff --git a/include/ur_client_library/comm/producer.h b/include/ur_client_library/comm/producer.h index 1a9887506..19a929a70 100644 --- a/include/ur_client_library/comm/producer.h +++ b/include/ur_client_library/comm/producer.h @@ -43,6 +43,7 @@ class URProducer : public IProducer URStream& stream_; Parser& parser_; std::chrono::seconds timeout_; + std::function on_reconnect_cb_; bool running_; @@ -124,9 +125,21 @@ class URProducer : public IProducer if (!running_) return true; + if (stream_.getState() == SocketState::Connected) + { + continue; + } + if (stream_.closed()) return false; + if (on_reconnect_cb_) + { + URCL_LOG_WARN("Failed to read from stream, invoking on reconnect callback and stopping the producer"); + on_reconnect_cb_(); + return false; + } + URCL_LOG_WARN("Failed to read from stream, reconnecting in %ld seconds...", timeout_.count()); std::this_thread::sleep_for(timeout_); @@ -140,6 +153,18 @@ class URProducer : public IProducer return false; } + + /*! + * \brief Sets the reconnection callback. Use this to configure a reconnection callback instead of connecting directly + * to the stream again. This is needed for RTDE as it requires setting up the communication again upon reconnection it + * is not enough to just reconnect to the stream. + * + * \param on_reconnect_cb Callback to be invoked when connection is lost to the stream. + */ + void setReconnectionCallback(std::function on_reconnect_cb) + { + on_reconnect_cb_ = on_reconnect_cb; + } }; } // namespace comm } // namespace urcl diff --git a/include/ur_client_library/rtde/rtde_client.h b/include/ur_client_library/rtde/rtde_client.h index d94faa5d1..7343a4ad8 100644 --- a/include/ur_client_library/rtde/rtde_client.h +++ b/include/ur_client_library/rtde/rtde_client.h @@ -231,6 +231,10 @@ class RTDEClient comm::INotifier notifier_; std::unique_ptr> pipeline_; RTDEWriter writer_; + std::atomic reconnecting_; + std::atomic stop_reconnection_; + std::mutex reconnect_mutex_; + std::thread reconnecting_thread_; VersionInformation urcontrol_version_; @@ -249,12 +253,13 @@ class RTDEClient // the robot is booted. std::vector ensureTimestampIsPresent(const std::vector& output_recipe) const; - void setupCommunication(const size_t max_num_tries = 0, + bool setupCommunication(const size_t max_num_tries = 0, const std::chrono::milliseconds reconnection_time = std::chrono::seconds(10)); - bool negotiateProtocolVersion(const uint16_t protocol_version); - void queryURControlVersion(); - void setupOutputs(const uint16_t protocol_version); - void setupInputs(); + uint16_t negotiateProtocolVersion(); + bool queryURControlVersion(); + void setTargetFrequency(); + bool setupOutputs(const uint16_t protocol_version); + bool setupInputs(); void disconnect(); /*! @@ -288,6 +293,12 @@ class RTDEClient * \returns A vector of variable variable_names */ std::vector splitVariableTypes(const std::string& variable_types) const; + + /*! + * \brief Reconnects to the RTDE interface and set the input and output recipes again. + */ + void reconnect(); + void reconnectCallback(); }; } // namespace rtde_interface diff --git a/include/ur_client_library/rtde/rtde_writer.h b/include/ur_client_library/rtde/rtde_writer.h index e47d19961..cec8b7e2d 100644 --- a/include/ur_client_library/rtde/rtde_writer.h +++ b/include/ur_client_library/rtde/rtde_writer.h @@ -61,11 +61,7 @@ class RTDEWriter ~RTDEWriter() { - running_ = false; - if (writer_thread_.joinable()) - { - writer_thread_.join(); - } + stop(); } /*! @@ -88,6 +84,11 @@ class RTDEWriter */ void run(); + /*! + * \brief Stops the writer thread loop. + */ + void stop(); + /*! * \brief Creates a package to request setting a new value for the speed slider. * diff --git a/src/rtde/rtde_client.cpp b/src/rtde/rtde_client.cpp index de92c64db..b7c1bac73 100644 --- a/src/rtde/rtde_client.cpp +++ b/src/rtde/rtde_client.cpp @@ -46,6 +46,8 @@ RTDEClient::RTDEClient(std::string robot_ip, comm::INotifier& notifier, const st , notifier_(notifier) , pipeline_(std::make_unique>(*prod_, PIPELINE_NAME, notifier, true)) , writer_(&stream_, input_recipe_) + , reconnecting_(false) + , stop_reconnection_(false) , max_frequency_(URE_MAX_FREQUENCY) , target_frequency_(target_frequency) , client_state_(ClientState::UNINITIALIZED) @@ -69,6 +71,8 @@ RTDEClient::RTDEClient(std::string robot_ip, comm::INotifier& notifier, const st , notifier_(notifier) , pipeline_(std::make_unique>(*prod_, PIPELINE_NAME, notifier, true)) , writer_(&stream_, input_recipe_) + , reconnecting_(false) + , stop_reconnection_(false) , max_frequency_(URE_MAX_FREQUENCY) , target_frequency_(target_frequency) , client_state_(ClientState::UNINITIALIZED) @@ -77,6 +81,12 @@ RTDEClient::RTDEClient(std::string robot_ip, comm::INotifier& notifier, const st RTDEClient::~RTDEClient() { + prod_->setReconnectionCallback(nullptr); + stop_reconnection_ = true; + if (reconnecting_thread_.joinable()) + { + reconnecting_thread_.join(); + } disconnect(); } @@ -93,147 +103,131 @@ bool RTDEClient::init(const size_t max_connection_attempts, const std::chrono::m return true; } + prod_->setReconnectionCallback(nullptr); + unsigned int attempts = 0; std::stringstream ss; - while (attempts < max_initialization_attempts) + + while (!setupCommunication(max_connection_attempts, reconnection_timeout)) { - try - { - setupCommunication(max_connection_attempts, reconnection_timeout); - } - catch (const UrException& exc) - { - ss << exc.what() << std::endl; - } - if (client_state_ == ClientState::INITIALIZED) - { - return true; - } - if (++attempts < max_initialization_attempts) + if (++attempts >= max_initialization_attempts) { - URCL_LOG_ERROR("Failed to initialize RTDE client, retrying in %d seconds", initialization_timeout.count() / 1000); - std::this_thread::sleep_for(initialization_timeout); + disconnect(); + ss << "Failed to initialize RTDE client after " << max_initialization_attempts << " attempts"; + throw UrException(ss.str()); } + // disconnect to start on a clean slate when trying to set up communication again + disconnect(); + URCL_LOG_ERROR("Failed to initialize RTDE client, retrying in %d seconds", initialization_timeout.count() / 1000); + std::this_thread::sleep_for(initialization_timeout); } - ss << "Failed to initialize RTDE client after " << max_initialization_attempts << " attempts"; - throw UrException(ss.str()); + // Stop pipeline again + pipeline_->stop(); + client_state_ = ClientState::INITIALIZED; + // Set reconnection callback after we are initialized to ensure that a disconnect during initialization doesn't + // trigger a reconnect + prod_->setReconnectionCallback(std::bind(&RTDEClient::reconnectCallback, this)); + return true; } -void RTDEClient::setupCommunication(const size_t max_num_tries, const std::chrono::milliseconds reconnection_time) +bool RTDEClient::setupCommunication(const size_t max_num_tries, const std::chrono::milliseconds reconnection_time) { + // The state initializing is used inside disconnect to stop the pipeline again. client_state_ = ClientState::INITIALIZING; - // A running pipeline is needed inside setup - pipeline_->init(max_num_tries, reconnection_time); - pipeline_->run(); - - uint16_t protocol_version = MAX_RTDE_PROTOCOL_VERSION; - while (!negotiateProtocolVersion(protocol_version) && client_state_ == ClientState::INITIALIZING) + // A running pipeline is needed inside setup. + try { - URCL_LOG_INFO("Robot did not accept RTDE protocol version '%hu'. Trying lower protocol version", protocol_version); - protocol_version--; - if (protocol_version == 0) - { - throw UrException("Protocol version for RTDE communication could not be established. Robot didn't accept any of " - "the suggested versions."); - } + pipeline_->init(max_num_tries, reconnection_time); } - if (client_state_ == ClientState::UNINITIALIZED) - return; - - URCL_LOG_INFO("Negotiated RTDE protocol version to %hu.", protocol_version); - parser_.setProtocolVersion(protocol_version); - - queryURControlVersion(); - if (client_state_ == ClientState::UNINITIALIZED) - return; - - if (urcontrol_version_.major < 5) + catch (const UrException& exc) { - max_frequency_ = CB3_MAX_FREQUENCY; + URCL_LOG_ERROR("Caught exception %s, while trying to initialize pipeline", exc.what()); + return false; } + pipeline_->run(); - if (target_frequency_ == 0) - { - // Default to maximum frequency - target_frequency_ = max_frequency_; - } - else if (target_frequency_ <= 0.0 || target_frequency_ > max_frequency_) + uint16_t protocol_version = negotiateProtocolVersion(); + // Protocol version must be above zero + if (protocol_version == 0) { - // Target frequency outside valid range - throw UrException("Invalid target frequency of RTDE connection"); + return false; } - setupOutputs(protocol_version); - if (client_state_ == ClientState::UNINITIALIZED) - return; + bool is_rtde_comm_setup = true; + is_rtde_comm_setup = queryURControlVersion(); - if (!isRobotBooted()) + if (is_rtde_comm_setup) { - disconnect(); - return; + setTargetFrequency(); } + is_rtde_comm_setup = is_rtde_comm_setup && setupOutputs(protocol_version); + + is_rtde_comm_setup = is_rtde_comm_setup && isRobotBooted(); + if (input_recipe_.size() > 0) { - setupInputs(); - if (client_state_ == ClientState::UNINITIALIZED) - return; + is_rtde_comm_setup = is_rtde_comm_setup && setupInputs(); } - - // We finished communication for now - pipeline_->stop(); - client_state_ = ClientState::INITIALIZED; + return is_rtde_comm_setup; } -bool RTDEClient::negotiateProtocolVersion(const uint16_t protocol_version) +uint16_t RTDEClient::negotiateProtocolVersion() { - // Protocol version should always be 1 before starting negotiation - parser_.setProtocolVersion(1); - unsigned int num_retries = 0; - uint8_t buffer[4096]; - size_t size; - size_t written; - size = RequestProtocolVersionRequest::generateSerializedRequest(buffer, protocol_version); - if (!stream_.write(buffer, size, written)) - { - URCL_LOG_ERROR("Sending protocol version query to robot failed, disconnecting"); - disconnect(); - return false; - } - - while (num_retries < MAX_REQUEST_RETRIES) - { - std::unique_ptr package; - if (!pipeline_->getLatestProduct(package, std::chrono::milliseconds(1000))) - { - URCL_LOG_ERROR("failed to get package from rtde interface, disconnecting"); - disconnect(); - return false; - } - if (rtde_interface::RequestProtocolVersion* tmp_version = - dynamic_cast(package.get())) + uint16_t protocol_version = MAX_RTDE_PROTOCOL_VERSION; + while (protocol_version > 0) + { + // Protocol version should always be 1 before starting negotiation + parser_.setProtocolVersion(1); + unsigned int num_retries = 0; + uint8_t buffer[4096]; + size_t size; + size_t written; + size = RequestProtocolVersionRequest::generateSerializedRequest(buffer, protocol_version); + if (!stream_.write(buffer, size, written)) { - // Reset the num_tries variable in case we have to try with another protocol version. - num_retries = 0; - return tmp_version->accepted_; + URCL_LOG_ERROR("Sending protocol version query to robot failed"); + return 0; } - else + + while (num_retries < MAX_REQUEST_RETRIES) { - std::stringstream ss; - ss << "Did not receive protocol negotiation answer from robot. Message received instead: " << std::endl - << package->toString() << ". Retrying..."; - num_retries++; - URCL_LOG_WARN("%s", ss.str().c_str()); + std::unique_ptr package; + if (!pipeline_->getLatestProduct(package, std::chrono::milliseconds(1000))) + { + URCL_LOG_ERROR("failed to get package from RTDE interface"); + return 0; + } + if (rtde_interface::RequestProtocolVersion* tmp_version = + dynamic_cast(package.get())) + { + if (tmp_version->accepted_) + { + URCL_LOG_INFO("Negotiated RTDE protocol version to %hu.", protocol_version); + parser_.setProtocolVersion(protocol_version); + return protocol_version; + } + break; + } + else + { + std::stringstream ss; + ss << "Did not receive protocol negotiation answer from robot. Message received instead: " << std::endl + << package->toString() << ". Retrying..."; + num_retries++; + URCL_LOG_WARN("%s", ss.str().c_str()); + } } + + URCL_LOG_INFO("Robot did not accept RTDE protocol version '%hu'. Trying lower protocol version", protocol_version); + protocol_version--; } - std::stringstream ss; - ss << "Could not negotiate RTDE protocol version after " << MAX_REQUEST_RETRIES - << " tries. Please check the output of the " - "negotiation attempts above to get a hint what could be wrong."; - throw UrException(ss.str()); + URCL_LOG_ERROR("Protocol version for RTDE communication could not be established. Robot didn't accept any of " + "the suggested versions."); + return 0; } -void RTDEClient::queryURControlVersion() +bool RTDEClient::queryURControlVersion() { unsigned int num_retries = 0; uint8_t buffer[4096]; @@ -242,9 +236,8 @@ void RTDEClient::queryURControlVersion() size = GetUrcontrolVersionRequest::generateSerializedRequest(buffer); if (!stream_.write(buffer, size, written)) { - URCL_LOG_ERROR("Sending urcontrol version query request to robot failed, disconnecting"); - disconnect(); - return; + URCL_LOG_ERROR("Sending urcontrol version query request to robot failed"); + return false; } std::unique_ptr package; @@ -252,16 +245,15 @@ void RTDEClient::queryURControlVersion() { if (!pipeline_->getLatestProduct(package, std::chrono::milliseconds(1000))) { - URCL_LOG_ERROR("No answer to urcontrol version query was received from robot, disconnecting"); - disconnect(); - return; + URCL_LOG_ERROR("No answer to urcontrol version query was received from robot"); + return false; } if (rtde_interface::GetUrcontrolVersion* tmp_urcontrol_version = dynamic_cast(package.get())) { urcontrol_version_ = tmp_urcontrol_version->version_information_; - return; + return true; } else { @@ -276,7 +268,26 @@ void RTDEClient::queryURControlVersion() ss << "Could not query urcontrol version after " << MAX_REQUEST_RETRIES << " tries. Please check the output of the " "negotiation attempts above to get a hint what could be wrong."; - throw UrException(ss.str()); + return false; +} + +void RTDEClient::setTargetFrequency() +{ + if (urcontrol_version_.major < 5) + { + max_frequency_ = CB3_MAX_FREQUENCY; + } + + if (target_frequency_ == 0) + { + // Default to maximum frequency + target_frequency_ = max_frequency_; + } + else if (target_frequency_ <= 0.0 || target_frequency_ > max_frequency_) + { + // Target frequency outside valid range + throw UrException("Invalid target frequency of RTDE connection"); + } } void RTDEClient::resetOutputRecipe(const std::vector new_recipe) @@ -284,12 +295,17 @@ void RTDEClient::resetOutputRecipe(const std::vector new_recipe) disconnect(); output_recipe_.assign(new_recipe.begin(), new_recipe.end()); + + // Reset pipeline first otherwise we will segfault, if the producer object no longer exists, when destroying the + // pipeline + pipeline_.reset(); + parser_ = RTDEParser(output_recipe_); prod_ = std::make_unique>(stream_, parser_); pipeline_ = std::make_unique>(*prod_, PIPELINE_NAME, notifier_, true); } -void RTDEClient::setupOutputs(const uint16_t protocol_version) +bool RTDEClient::setupOutputs(const uint16_t protocol_version) { unsigned int num_retries = 0; size_t size; @@ -317,17 +333,15 @@ void RTDEClient::setupOutputs(const uint16_t protocol_version) // Send output recipe to robot if (!stream_.write(buffer, size, written)) { - URCL_LOG_ERROR("Could not send RTDE output recipe to robot, disconnecting"); - disconnect(); - return; + URCL_LOG_ERROR("Could not send RTDE output recipe to robot"); + return false; } std::unique_ptr package; if (!pipeline_->getLatestProduct(package, std::chrono::milliseconds(1000))) { - URCL_LOG_ERROR("Did not receive confirmation on RTDE output recipe, disconnecting"); - disconnect(); - return; + URCL_LOG_ERROR("Did not receive confirmation on RTDE output recipe"); + return false; } if (rtde_interface::ControlPackageSetupOutputs* tmp_output = @@ -371,7 +385,7 @@ void RTDEClient::setupOutputs(const uint16_t protocol_version) // Some variables are not available so retry setting up the communication with a stripped-down output recipe resetOutputRecipe(available_variables); - return; + return false; } else { @@ -382,7 +396,7 @@ void RTDEClient::setupOutputs(const uint16_t protocol_version) else { // All variables are accounted for in the RTDE package - return; + return true; } } else @@ -398,10 +412,11 @@ void RTDEClient::setupOutputs(const uint16_t protocol_version) ss << "Could not setup RTDE outputs after " << MAX_REQUEST_RETRIES << " tries. Please check the output of the " "negotiation attempts above to get a hint what could be wrong."; - throw UrException(ss.str()); + URCL_LOG_ERROR(ss.str().c_str()); + return false; } -void RTDEClient::setupInputs() +bool RTDEClient::setupInputs() { unsigned int num_retries = 0; size_t size; @@ -410,9 +425,8 @@ void RTDEClient::setupInputs() size = ControlPackageSetupInputsRequest::generateSerializedRequest(buffer, input_recipe_); if (!stream_.write(buffer, size, written)) { - URCL_LOG_ERROR("Could not send RTDE input recipe to robot, disconnecting"); - disconnect(); - return; + URCL_LOG_ERROR("Could not send RTDE input recipe to robot"); + return false; } while (num_retries < MAX_REQUEST_RETRIES) @@ -420,11 +434,9 @@ void RTDEClient::setupInputs() std::unique_ptr package; if (!pipeline_->getLatestProduct(package, std::chrono::milliseconds(1000))) { - URCL_LOG_ERROR("Did not receive confirmation on RTDE input recipe, disconnecting"); - disconnect(); - return; + URCL_LOG_ERROR("Did not receive confirmation on RTDE input recipe"); + return false; } - if (rtde_interface::ControlPackageSetupInputs* tmp_input = dynamic_cast(package.get())) @@ -450,7 +462,7 @@ void RTDEClient::setupInputs() } writer_.init(tmp_input->input_recipe_id_); - return; + return true; } else { @@ -465,7 +477,8 @@ void RTDEClient::setupInputs() ss << "Could not setup RTDE inputs after " << MAX_REQUEST_RETRIES << " tries. Please check the output of the " "negotiation attempts above to get a hint what could be wrong."; - throw UrException(ss.str()); + URCL_LOG_ERROR(ss.str().c_str()); + return false; } void RTDEClient::disconnect() @@ -482,6 +495,7 @@ void RTDEClient::disconnect() if (client_state_ > ClientState::UNINITIALIZED) { stream_.disconnect(); + writer_.stop(); } client_state_ = ClientState::UNINITIALIZED; } @@ -602,7 +616,6 @@ bool RTDEClient::sendStart() ss << "Did not receive answer to RTDE start request. Message received instead: " << std::endl << package->toString(); URCL_LOG_WARN("%s", ss.str().c_str()); - return false; } } std::stringstream ss; @@ -635,7 +648,6 @@ bool RTDEClient::sendPause() } if (rtde_interface::ControlPackagePause* tmp = dynamic_cast(package.get())) { - client_state_ = ClientState::PAUSED; return tmp->accepted_; } } @@ -689,16 +701,29 @@ std::vector RTDEClient::ensureTimestampIsPresent(const std::vector< std::unique_ptr RTDEClient::getDataPackage(std::chrono::milliseconds timeout) { - std::unique_ptr urpackage; - if (pipeline_->getLatestProduct(urpackage, timeout)) + // Cannot get data packages while reconnecting as we could end up getting some of the configuration packages + if (reconnect_mutex_.try_lock()) { - rtde_interface::DataPackage* tmp = dynamic_cast(urpackage.get()); - if (tmp != nullptr) + std::unique_ptr urpackage; + if (pipeline_->getLatestProduct(urpackage, timeout)) { - urpackage.release(); - return std::unique_ptr(tmp); + rtde_interface::DataPackage* tmp = dynamic_cast(urpackage.get()); + if (tmp != nullptr) + { + urpackage.release(); + reconnect_mutex_.unlock(); + return std::unique_ptr(tmp); + } } + reconnect_mutex_.unlock(); } + else + { + URCL_LOG_WARN("Unable to get data package while reconnecting to the RTDE interface"); + auto period = std::chrono::duration(1.0 / target_frequency_); + std::this_thread::sleep_for(period); + } + return std::unique_ptr(nullptr); } @@ -723,5 +748,106 @@ std::vector RTDEClient::splitVariableTypes(const std::string& varia } return result; } + +void RTDEClient::reconnect() +{ + URCL_LOG_INFO("Reconnecting to the RTDE interface"); + // Locking mutex to ensure that calling getDataPackage doesn't influence the communication needed for reconfiguring + // the RTDE connection + std::lock_guard lock(reconnect_mutex_); + ClientState cur_client_state = client_state_; + disconnect(); + + const size_t max_initialization_attempts = 3; + size_t cur_initialization_attempt = 0; + bool client_reconnected = false; + while (cur_initialization_attempt < max_initialization_attempts) + { + bool is_communication_setup = false; + try + { + is_communication_setup = setupCommunication(1, std::chrono::milliseconds{ 10000 }); + } + catch (const UrException& exc) + { + URCL_LOG_ERROR("Caught exception while reconnecting to the RTDE interface %s. Unable to reconnect", exc.what()); + disconnect(); + reconnecting_ = false; + return; + } + + const std::string reconnecting_stopped_msg = "Reconnecting has been stopped, because the object is being destroyed"; + if (stop_reconnection_) + { + URCL_LOG_WARN(reconnecting_stopped_msg.c_str()); + return; + } + + if (is_communication_setup) + { + client_reconnected = true; + break; + } + + auto duration = std::chrono::seconds(1); + if (stream_.getState() != comm::SocketState::Connected) + { + // We don't wanna count it as an initialization attempt if we cannot connect to the socket and we want to wait + // longer before reconnecting. + duration = std::chrono::seconds(10); + URCL_LOG_ERROR("Failed to connect to the RTDE server, retrying in %i seconds", duration.count()); + } + else + { + URCL_LOG_ERROR("Failed to initialize RTDE client, retrying in %i second", duration.count()); + cur_initialization_attempt += 1; + } + + disconnect(); + + auto start_time = std::chrono::steady_clock::now(); + while (std::chrono::steady_clock::now() - start_time < duration) + { + std::this_thread::sleep_for(std::chrono::milliseconds(250)); + if (stop_reconnection_) + { + URCL_LOG_WARN(reconnecting_stopped_msg.c_str()); + return; + } + } + } + + if (client_reconnected == false) + { + URCL_LOG_ERROR("Failed to initialize RTDE client after %i attempts, unable to reconnect", + max_initialization_attempts); + disconnect(); + reconnecting_ = false; + return; + } + + start(); + if (cur_client_state == ClientState::PAUSED) + { + pause(); + } + URCL_LOG_INFO("Done reconnecting to the RTDE interface"); + reconnecting_ = false; +} + +void RTDEClient::reconnectCallback() +{ + if (reconnecting_ || stop_reconnection_) + { + return; + } + if (reconnecting_thread_.joinable()) + { + reconnecting_thread_.join(); + } + reconnecting_ = true; + reconnecting_thread_ = std::thread(&RTDEClient::reconnect, this); +} + } // namespace rtde_interface } // namespace urcl diff --git a/src/rtde/rtde_writer.cpp b/src/rtde/rtde_writer.cpp index 4362c2514..206c4823d 100644 --- a/src/rtde/rtde_writer.cpp +++ b/src/rtde/rtde_writer.cpp @@ -47,6 +47,10 @@ void RTDEWriter::setInputRecipe(const std::vector& recipe) void RTDEWriter::init(uint8_t recipe_id) { + if (running_) + { + return; + } recipe_id_ = recipe_id; package_.initEmpty(); running_ = true; @@ -71,6 +75,15 @@ void RTDEWriter::run() URCL_LOG_DEBUG("Write thread ended."); } +void RTDEWriter::stop() +{ + running_ = false; + if (writer_thread_.joinable()) + { + writer_thread_.join(); + } +} + bool RTDEWriter::sendSpeedSlider(double speed_slider_fraction) { if (speed_slider_fraction > 1.0 || speed_slider_fraction < 0.0) diff --git a/tests/test_pipeline.cpp b/tests/test_pipeline.cpp index 8da69fbed..ca2368d6d 100644 --- a/tests/test_pipeline.cpp +++ b/tests/test_pipeline.cpp @@ -199,6 +199,7 @@ TEST_F(PipelineTest, stop_pipeline) TEST_F(PipelineTest, consumer_pipeline) { + pipeline_.reset(); stream_.reset(new comm::URStream("127.0.0.1", 60002)); producer_.reset(new comm::URProducer(*stream_.get(), *parser_.get())); TestConsumer consumer; @@ -236,6 +237,7 @@ TEST_F(PipelineTest, consumer_pipeline) TEST_F(PipelineTest, connect_non_connected_robot) { + pipeline_.reset(); stream_.reset(new comm::URStream("127.0.0.1", 12321)); producer_.reset(new comm::URProducer(*stream_.get(), *parser_.get())); TestConsumer consumer;