diff --git a/rclcpp/include/rclcpp/experimental/action_client_intra_process.hpp b/rclcpp/include/rclcpp/experimental/action_client_intra_process.hpp index 17a5694002..019890a4d0 100644 --- a/rclcpp/include/rclcpp/experimental/action_client_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/action_client_intra_process.hpp @@ -50,13 +50,30 @@ class ActionClientIntraProcess : public ActionClientIntraProcessBase // Useful aliases for the action client data types using ResponseCallback = std::function)>; + + // Aliases for the GoalResponse ring buffer using GoalResponse = typename ActionT::Impl::SendGoalService::Response; using GoalResponseSharedPtr = typename std::shared_ptr; + using GoalResponseDataPair = typename std::pair; + using GoalResponseVoidDataPair = typename std::pair>; + using GoalResponsePairSharedPtr = typename std::shared_ptr; + + // Aliases for the ResultResponse ring buffer using ResultResponse = typename ActionT::Impl::GetResultService::Response; using ResultResponseSharedPtr = typename std::shared_ptr; + using ResultResponseDataPair = typename std::pair; + using ResultResponseVoidDataPair = typename std::pair>; + using ResultResponsePairSharedPtr = typename std::shared_ptr; + + // Aliases for the CancelResponse ring buffer + using CancelResponse = typename ActionT::Impl::CancelGoalService::Response; + using CancelResponseSharedPtr = typename std::shared_ptr; + using CancelResponseDataPair = typename std::pair; + using CancelResponseVoidDataPair = typename std::pair>; + using CancelResponsePairSharedPtr = typename std::shared_ptr; + using FeedbackMessage = typename ActionT::Impl::FeedbackMessage; using FeedbackSharedPtr = typename std::shared_ptr; - using CancelGoalSharedPtr = typename std::shared_ptr; using GoalStatusSharedPtr = typename std::shared_ptr; ActionClientIntraProcess( @@ -64,21 +81,21 @@ class ActionClientIntraProcess : public ActionClientIntraProcessBase const std::string & action_name, const rcl_action_client_depth_t & qos_history, ResponseCallback goal_status_callback, - ResponseCallback feedback_callback) - : goal_status_callback_(goal_status_callback), - feedback_callback_(feedback_callback), - ActionClientIntraProcessBase( + ResponseCallback feedback_callback, + std::recursive_mutex & reentrant_mutex) + : ActionClientIntraProcessBase( context, action_name, - QoS(qos_history.goal_service_depth)) + QoS(qos_history.goal_service_depth), + reentrant_mutex) { // Create the intra-process buffers goal_response_buffer_ = - rclcpp::experimental::create_service_intra_process_buffer( + rclcpp::experimental::create_service_intra_process_buffer( QoS(qos_history.goal_service_depth)); result_response_buffer_ = - rclcpp::experimental::create_service_intra_process_buffer( + rclcpp::experimental::create_service_intra_process_buffer( QoS(qos_history.result_service_depth)); status_buffer_ = @@ -90,8 +107,11 @@ class ActionClientIntraProcess : public ActionClientIntraProcessBase QoS(qos_history.feedback_topic_depth)); cancel_response_buffer_ = - rclcpp::experimental::create_service_intra_process_buffer( + rclcpp::experimental::create_service_intra_process_buffer( QoS(qos_history.cancel_service_depth)); + + set_response_callback_to_event_type(EventType::FeedbackReady, feedback_callback); + set_response_callback_to_event_type(EventType::StatusReady, goal_status_callback); } virtual ~ActionClientIntraProcess() = default; @@ -100,6 +120,12 @@ class ActionClientIntraProcess : public ActionClientIntraProcessBase { (void) wait_set; + is_goal_response_ready_ = goal_response_buffer_->has_data(); + is_result_response_ready_ = result_response_buffer_->has_data(); + is_cancel_response_ready_ = cancel_response_buffer_->has_data(); + is_feedback_ready_ = feedback_buffer_->has_data(); + is_status_ready_ = status_buffer_->has_data(); + return is_feedback_ready_ || is_status_ready_ || is_goal_response_ready_ || @@ -107,54 +133,63 @@ class ActionClientIntraProcess : public ActionClientIntraProcessBase is_result_response_ready_; } - // Store responses callbacks. - // We don't use mutex to protect these callbacks since they - // are called always after they are set. - void store_goal_response_callback(ResponseCallback callback) + + void store_goal_response_callback(size_t goal_id, ResponseCallback response_callback) { - goal_response_callback_ = callback; + set_response_callback_to_event_type(EventType::GoalResponse, response_callback, goal_id); } - void store_cancel_goal_callback(ResponseCallback callback) + void store_cancel_goal_callback(size_t goal_id, ResponseCallback callback) { - cancel_goal_callback_ = callback; + set_response_callback_to_event_type(EventType::CancelResponse, callback, goal_id); } - void store_result_response_callback(ResponseCallback callback) + void store_result_response_callback(size_t goal_id, ResponseCallback callback) { - result_response_callback_ = callback; + set_response_callback_to_event_type(EventType::ResultResponse, callback, goal_id); } // Store responses from server - void store_ipc_action_goal_response(GoalResponseSharedPtr goal_response) + void store_ipc_action_goal_response( + GoalResponseSharedPtr goal_response, + size_t goal_id) { - goal_response_buffer_->add(std::move(goal_response)); + goal_response_buffer_->add( + std::make_shared( + std::make_pair(goal_id, std::move(goal_response)))); + gc_.trigger(); - is_goal_response_ready_ = true; - invoke_on_ready_callback(EventType::GoalResponse); + invoke_on_ready_callback(EventType::GoalResponse, goal_id); } - void store_ipc_action_result_response(ResultResponseSharedPtr result_response) + void store_ipc_action_result_response( + ResultResponseSharedPtr result_response, + size_t goal_id) { - result_response_buffer_->add(std::move(result_response)); + result_response_buffer_->add( + std::make_shared( + std::make_pair(goal_id, std::move(result_response)))); + gc_.trigger(); - is_result_response_ready_ = true; - invoke_on_ready_callback(EventType::ResultResponse); + invoke_on_ready_callback(EventType::ResultResponse, goal_id); } - void store_ipc_action_cancel_response(CancelGoalSharedPtr cancel_response) + void store_ipc_action_cancel_response( + CancelResponseSharedPtr cancel_response, + size_t goal_id) { - cancel_response_buffer_->add(std::move(cancel_response)); + cancel_response_buffer_->add( + std::make_shared( + std::make_pair(goal_id, std::move(cancel_response)))); + gc_.trigger(); - is_cancel_response_ready_ = true; - invoke_on_ready_callback(EventType::CancelResponse); + invoke_on_ready_callback(EventType::CancelResponse, goal_id); } void store_ipc_action_feedback(FeedbackSharedPtr feedback) { feedback_buffer_->add(std::move(feedback)); gc_.trigger(); - is_feedback_ready_ = true; invoke_on_ready_callback(EventType::FeedbackReady); } @@ -162,31 +197,32 @@ class ActionClientIntraProcess : public ActionClientIntraProcessBase { status_buffer_->add(std::move(status)); gc_.trigger(); - is_status_ready_ = true; invoke_on_ready_callback(EventType::StatusReady); } std::shared_ptr take_data() override { + std::shared_ptr data; + if (is_goal_response_ready_) { - auto data = std::move(goal_response_buffer_->consume()); - return std::static_pointer_cast(data); - } else if (is_result_response_ready_) { - auto data = std::move(result_response_buffer_->consume()); - return std::static_pointer_cast(data); - } else if (is_cancel_response_ready_) { - auto data = std::move(cancel_response_buffer_->consume()); - return std::static_pointer_cast(data); - } else if (is_feedback_ready_) { - auto data = std::move(feedback_buffer_->consume()); - return std::static_pointer_cast(data); - } else if (is_status_ready_) { - auto data = std::move(status_buffer_->consume()); - return std::static_pointer_cast(data); - } else { - throw std::runtime_error("Taking data from intra-process action client but nothing is ready"); + data = std::move(goal_response_buffer_->consume()); + } + else if (is_result_response_ready_) { + data = std::move(result_response_buffer_->consume()); + } + else if (is_cancel_response_ready_) { + data = std::move(cancel_response_buffer_->consume()); + } + else if (is_feedback_ready_) { + data = std::move(feedback_buffer_->consume()); } + else if (is_status_ready_) { + data = status_buffer_->consume(); + } + + // Data could be null if there were more events than elements in the buffer + return data; } std::shared_ptr @@ -195,70 +231,82 @@ class ActionClientIntraProcess : public ActionClientIntraProcessBase // Mark as ready the event type from which we want to take data switch (static_cast(id)) { case EventType::ResultResponse: - is_result_response_ready_ = true; + is_result_response_ready_ = result_response_buffer_->has_data(); break; case EventType::CancelResponse: - is_cancel_response_ready_ = true; + is_cancel_response_ready_ = cancel_response_buffer_->has_data(); break; case EventType::GoalResponse: - is_goal_response_ready_ = true; + is_goal_response_ready_ = goal_response_buffer_->has_data(); break; case EventType::FeedbackReady: - is_feedback_ready_ = true; + is_feedback_ready_ = feedback_buffer_->has_data(); break; case EventType::StatusReady: - is_status_ready_ = true; + is_status_ready_ = status_buffer_->has_data(); break; } return take_data(); } - void execute(std::shared_ptr & data) { - // How to handle case when more than one flag is ready? - // For example, feedback and status are both ready, guard condition triggered - // twice, but we process a single entity here. - // On the default executor using a waitset, waitables are checked twice if ready, - // so that fixes the issue. Check if this is a problem with EventsExecutor. if (!data) { - throw std::runtime_error("'data' is empty"); + // This can happen when there were more events than elements in the ring buffer + return; } - if (is_goal_response_ready_) { - is_goal_response_ready_ = false; - goal_response_callback_(std::move(data)); - } else if (is_result_response_ready_) { - is_result_response_ready_ = false; - result_response_callback_(std::move(data)); - } else if (is_cancel_response_ready_) { - is_cancel_response_ready_ = false; - cancel_goal_callback_(std::move(data)); - } else if (is_feedback_ready_) { - is_feedback_ready_ = false; - feedback_callback_(std::move(data)); - } else if (is_status_ready_) { - is_status_ready_ = false; - goal_status_callback_(std::move(data)); - } else { + if (is_goal_response_ready_.exchange(false)) { + auto goal_response_pair = std::static_pointer_cast(data); + auto goal_id = goal_response_pair->first; + auto & goal_response = goal_response_pair->second; + + call_response_callback_and_erase( + EventType::GoalResponse, + goal_response, + goal_id); + } + else if (is_result_response_ready_.exchange(false)) { + auto result_response_pair = std::static_pointer_cast(data); + auto goal_id = result_response_pair->first; + auto & result_response = result_response_pair->second; + + call_response_callback_and_erase( + EventType::ResultResponse, + result_response, + goal_id); + } + else if (is_cancel_response_ready_.exchange(false)) { + auto cancel_response_pair = std::static_pointer_cast(data); + auto goal_id = cancel_response_pair->first; + auto & cancel_response = cancel_response_pair->second; + + call_response_callback_and_erase( + EventType::CancelResponse, + cancel_response, + goal_id); + } + else if (is_feedback_ready_.exchange(false)) { + call_response_callback_and_erase( + EventType::FeedbackReady, data, 0, false); + } + else if (is_status_ready_.exchange(false)) { + call_response_callback_and_erase( + EventType::StatusReady, data, 0, false); + } + else { throw std::runtime_error("Executing intra-process action client but nothing is ready"); } } protected: - ResponseCallback goal_response_callback_; - ResponseCallback result_response_callback_; - ResponseCallback cancel_goal_callback_; - ResponseCallback goal_status_callback_; - ResponseCallback feedback_callback_; - - // Create buffers to store data coming from server + // Declare buffers to store responses coming from action server typename rclcpp::experimental::buffers::ServiceIntraProcessBuffer< - GoalResponseSharedPtr>::UniquePtr goal_response_buffer_; + GoalResponsePairSharedPtr>::UniquePtr goal_response_buffer_; typename rclcpp::experimental::buffers::ServiceIntraProcessBuffer< - ResultResponseSharedPtr>::UniquePtr result_response_buffer_; + ResultResponsePairSharedPtr>::UniquePtr result_response_buffer_; typename rclcpp::experimental::buffers::ServiceIntraProcessBuffer< FeedbackSharedPtr>::UniquePtr feedback_buffer_; @@ -266,8 +314,8 @@ class ActionClientIntraProcess : public ActionClientIntraProcessBase rclcpp::experimental::buffers::ServiceIntraProcessBuffer< GoalStatusSharedPtr>::UniquePtr status_buffer_; - rclcpp::experimental::buffers::ServiceIntraProcessBuffer< - CancelGoalSharedPtr>::UniquePtr cancel_response_buffer_; + typename rclcpp::experimental::buffers::ServiceIntraProcessBuffer< + CancelResponsePairSharedPtr>::UniquePtr cancel_response_buffer_; std::atomic is_feedback_ready_{false}; std::atomic is_status_ready_{false}; diff --git a/rclcpp/include/rclcpp/experimental/action_client_intra_process_base.hpp b/rclcpp/include/rclcpp/experimental/action_client_intra_process_base.hpp index e08b916556..16ee058a07 100644 --- a/rclcpp/include/rclcpp/experimental/action_client_intra_process_base.hpp +++ b/rclcpp/include/rclcpp/experimental/action_client_intra_process_base.hpp @@ -55,10 +55,12 @@ class ActionClientIntraProcessBase : public rclcpp::Waitable ActionClientIntraProcessBase( rclcpp::Context::SharedPtr context, const std::string & action_name, - const rclcpp::QoS & qos_profile) - : gc_(context), - action_name_(action_name), - qos_profile_(qos_profile) + const rclcpp::QoS & qos_profile, + std::recursive_mutex & reentrant_mutex) + : action_name_(action_name), + qos_profile_(qos_profile), + gc_(context), + reentrant_mutex_(reentrant_mutex) {} virtual ~ActionClientIntraProcessBase() = default; @@ -128,122 +130,122 @@ class ActionClientIntraProcessBase : public rclcpp::Waitable "is not callable."); } - set_callback_to_event_type(EventType::ResultResponse, callback); - set_callback_to_event_type(EventType::CancelResponse, callback); - set_callback_to_event_type(EventType::GoalResponse, callback); - set_callback_to_event_type(EventType::FeedbackReady, callback); - set_callback_to_event_type(EventType::StatusReady, callback); + std::lock_guard lock(reentrant_mutex_); + on_ready_callback_ = callback; + + // If we had events happened before the "on_ready" callback was set, + // call callback with the events counter "unread_count". + for (auto& pair : event_info_multi_map_) { + auto & unread_count = pair.second.unread_count; + auto & event_type = pair.second.event_type; + if (unread_count) { + callback(unread_count, static_cast(event_type)); + unread_count = 0; + } + } } void clear_on_ready_callback() override { std::lock_guard lock(reentrant_mutex_); - event_type_to_on_ready_callback_.clear(); + on_ready_callback_ = nullptr; + } + + void erase_goal_info(size_t goal_id) + { + std::lock_guard lock(reentrant_mutex_); + event_info_multi_map_.erase(goal_id); } +private: + std::string action_name_; + QoS qos_profile_; + protected: - std::recursive_mutex reentrant_mutex_; rclcpp::GuardCondition gc_; - // Action client on ready callbacks and unread count. - // These callbacks can be set by the user to be notified about new events - // on the action client like a new goal response, result response, etc. - // These events have a counter associated with them, counting the amount of events - // that happened before having assigned a callback for them. - using EventTypeOnReadyCallback = std::function; - using CallbackUnreadCountPair = std::pair; + // Alias for the type used for the server responses callback + using ResponseCallback = std::function /*server response*/)>; + using OnReadyCallback = std::function; + + // Define a structure to hold event information + struct EventInfo { + // The event type + EventType event_type; + // The callback to be called with the responses from the server + ResponseCallback response_callback; + // Counter of events received before the "on_ready" and "response_callback" were set + size_t unread_count; + }; - // Map the different action client event types to their callbacks and unread count. - std::unordered_map event_type_to_on_ready_callback_; + // Mutex to sync operations on the client + std::recursive_mutex& reentrant_mutex_; + OnReadyCallback on_ready_callback_{nullptr}; + std::unordered_multimap event_info_multi_map_; // Invoke the callback to be called when the action client has a new event - void - invoke_on_ready_callback(EventType event_type) + // If the callback hasn't been set, increase the unread count. + void invoke_on_ready_callback( + EventType event_type, + size_t goal_id = 0) { std::lock_guard lock(reentrant_mutex_); - // Search for a callback for this event type - auto it = event_type_to_on_ready_callback_.find(event_type); - - if (it != event_type_to_on_ready_callback_.end()) { - auto & on_ready_callback = it->second.first; - // If there's a callback associated with this event type, call it - if (on_ready_callback) { - on_ready_callback(1); - } else { - // We don't have a callback for this event type yet, - // increase its event counter. - auto & event_type_unread_count = it->second.second; - event_type_unread_count++; + auto range = event_info_multi_map_.equal_range(goal_id); + for (auto it = range.first; it != range.second; ++it) { + if (it->second.event_type == event_type) { + if (on_ready_callback_) { + on_ready_callback_(1, static_cast(event_type)); + } else { + it->second.unread_count++; + } + return; } - } else { - // No entries found for this event type, create one - // with an emtpy callback and one unread event. - event_type_to_on_ready_callback_.emplace(event_type, std::make_pair(nullptr, 1)); } - } -private: - std::string action_name_; - QoS qos_profile_; + // If no entry found, create a new one with unread_count = 1 + EventInfo event_info{event_type, nullptr, 1}; + event_info_multi_map_.emplace(goal_id, event_info); + } - void set_callback_to_event_type( + void set_response_callback_to_event_type( EventType event_type, - std::function callback) + ResponseCallback response_callback, + size_t goal_id = 0) { - auto new_callback = create_event_type_callback(callback, event_type); - std::lock_guard lock(reentrant_mutex_); - // Check if we have already an entry for this event type - auto it = event_type_to_on_ready_callback_.find(event_type); - - if (it != event_type_to_on_ready_callback_.end()) { - // We have an entry for this event type, check how many - // events of this event type happened so far. - auto & event_type_unread_count = it->second.second; - if (event_type_unread_count) { - new_callback(event_type_unread_count); - } - event_type_unread_count = 0; - // Set the new callback for this event type - auto & event_type_on_ready_callback = it->second.first; - event_type_on_ready_callback = new_callback; - } else { - // We had no entries for this event type, create one - // with the new callback and zero as unread count. - event_type_to_on_ready_callback_.emplace(event_type, std::make_pair(new_callback, 0)); - } + EventInfo event_info{event_type, response_callback, 0}; + event_info_multi_map_.emplace(goal_id, event_info); } - std::function - create_event_type_callback( - std::function callback, - EventType event_type) + void call_response_callback_and_erase( + EventType event_type, + std::shared_ptr & response, + size_t goal_id = 0, + bool erase_event_info = true) { - // Note: we bind the int identifier argument to this waitable's entity types - auto new_callback = - [callback, event_type, this](size_t number_of_events) { - try { - callback(number_of_events, static_cast(event_type)); - } catch (const std::exception & exception) { - RCLCPP_ERROR_STREAM( - rclcpp::get_logger("rclcpp_action"), - "rclcpp::experimental::ActionClientIntraProcessBase@" << this << - " caught " << rmw::impl::cpp::demangle(exception) << - " exception in user-provided callback for the 'on ready' callback: " << - exception.what()); - } catch (...) { - RCLCPP_ERROR_STREAM( - rclcpp::get_logger("rclcpp_action"), - "rclcpp::experimental::ActionClientIntraProcessBase@" << this << - " caught unhandled exception in user-provided callback " << - "for the 'on ready' callback"); - } - }; + std::lock_guard lock(reentrant_mutex_); - return new_callback; + auto range = event_info_multi_map_.equal_range(goal_id); + + for (auto it = range.first; it != range.second; ++it) { + if (it->second.event_type == event_type) { + auto & response_callback = it->second.response_callback; + if (response_callback) { + response_callback(response); + } else { + throw std::runtime_error( + "IPC ActionClient: response_callback not set! EventType: " + + std::to_string(static_cast(event_type))); + } + if (erase_event_info) { + event_info_multi_map_.erase(it); + } + return; + } + } } }; diff --git a/rclcpp/include/rclcpp/experimental/action_server_intra_process.hpp b/rclcpp/include/rclcpp/experimental/action_server_intra_process.hpp index 156b8db563..dd307a0c5b 100644 --- a/rclcpp/include/rclcpp/experimental/action_server_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/action_server_intra_process.hpp @@ -102,6 +102,10 @@ class ActionServerIntraProcess : public ActionServerIntraProcessBase { (void)wait_set; + goal_request_ready_ = goal_request_buffer_->has_data(); + cancel_request_ready_ = cancel_request_buffer_->has_data(); + result_request_ready_ = result_request_buffer_->has_data(); + return goal_request_ready_ || cancel_request_ready_ || result_request_ready_ || @@ -114,8 +118,8 @@ class ActionServerIntraProcess : public ActionServerIntraProcessBase { goal_request_buffer_->add( std::make_pair(ipc_action_client_id, std::move(goal_request))); + gc_.trigger(); - goal_request_ready_ = true; invoke_on_ready_callback(EventType::GoalRequest); } @@ -125,8 +129,8 @@ class ActionServerIntraProcess : public ActionServerIntraProcessBase { result_request_buffer_->add( std::make_pair(ipc_action_client_id, std::move(result_request))); + gc_.trigger(); - result_request_ready_ = true; invoke_on_ready_callback(EventType::ResultRequest); } @@ -136,32 +140,30 @@ class ActionServerIntraProcess : public ActionServerIntraProcessBase { cancel_request_buffer_->add( std::make_pair(ipc_action_client_id, std::move(cancel_request))); + gc_.trigger(); - cancel_request_ready_ = true; invoke_on_ready_callback(EventType::CancelGoal); } std::shared_ptr take_data() override { + std::shared_ptr data; + if (goal_request_ready_) { - auto data = std::make_shared( - std::move(goal_request_buffer_->consume())); - return std::static_pointer_cast(data); - } else if (cancel_request_ready_) { - auto data = std::make_shared( - std::move(cancel_request_buffer_->consume())); - return std::static_pointer_cast(data); - } else if (result_request_ready_) { - auto data = std::make_shared( - std::move(result_request_buffer_->consume())); - return std::static_pointer_cast(data); - } else if (goal_expired_) { - return nullptr; - } else { - throw std::runtime_error("Taking data from action server but nothing is ready"); + data = std::make_shared( + std::move(goal_request_buffer_->consume())); + } + else if (cancel_request_ready_) { + data = std::make_shared( + std::move(cancel_request_buffer_->consume())); } - return nullptr; + else if (result_request_ready_) { + data = std::make_shared( + std::move(result_request_buffer_->consume())); + } + + return data; } std::shared_ptr @@ -170,13 +172,13 @@ class ActionServerIntraProcess : public ActionServerIntraProcessBase // Mark as ready the event type from which we want to take data switch (static_cast(id)) { case EventType::GoalRequest: - goal_request_ready_ = true; + goal_request_ready_ = goal_request_buffer_->has_data(); break; case EventType::CancelGoal: - cancel_request_ready_ = true; + cancel_request_ready_ = cancel_request_buffer_->has_data(); break; case EventType::ResultRequest: - result_request_ready_ = true; + result_request_ready_ = result_request_buffer_->has_data(); break; } @@ -186,22 +188,23 @@ class ActionServerIntraProcess : public ActionServerIntraProcessBase void execute(std::shared_ptr & data) { if (!data && !goal_expired_) { - throw std::runtime_error("'data' is empty"); + // Empty data can happen when there were more events than elements in the ring buffer + return; } - if (goal_request_ready_) { - goal_request_ready_ = false; + if (goal_request_ready_.exchange(false)) { auto goal_request_data = std::static_pointer_cast(data); execute_goal_request_received_(std::move(goal_request_data)); - } else if (cancel_request_ready_) { - cancel_request_ready_ = false; + } + else if (cancel_request_ready_.exchange(false)) { auto cancel_goal_data = std::static_pointer_cast(data); execute_cancel_request_received_(std::move(cancel_goal_data)); - } else if (result_request_ready_) { - result_request_ready_ = false; + } + else if (result_request_ready_.exchange(false)) { auto result_request_data = std::static_pointer_cast(data); execute_result_request_received_(std::move(result_request_data)); - } else if (goal_expired_) { + } + else if (goal_expired_) { // TODO(mauropasse): Handle goal expired case // execute_check_expired_goals(); } else { diff --git a/rclcpp/include/rclcpp/experimental/action_server_intra_process_base.hpp b/rclcpp/include/rclcpp/experimental/action_server_intra_process_base.hpp index 3ad2733597..b0e3a6eea1 100644 --- a/rclcpp/include/rclcpp/experimental/action_server_intra_process_base.hpp +++ b/rclcpp/include/rclcpp/experimental/action_server_intra_process_base.hpp @@ -127,32 +127,38 @@ class ActionServerIntraProcessBase : public rclcpp::Waitable "is not callable."); } - set_callback_to_event_type(EventType::GoalRequest, callback); - set_callback_to_event_type(EventType::CancelGoal, callback); - set_callback_to_event_type(EventType::ResultRequest, callback); + std::lock_guard lock(reentrant_mutex_); + + on_ready_callback_ = callback; + + for (auto& pair : event_type_to_unread_count_) { + auto & event_type = pair.first; + auto & unread_count = pair.second; + if (unread_count) { + on_ready_callback_(unread_count, static_cast(event_type)); + unread_count = 0; + } + } } void clear_on_ready_callback() override { std::lock_guard lock(reentrant_mutex_); - event_type_to_on_ready_callback_.clear(); + on_ready_callback_ = nullptr; } protected: - std::recursive_mutex reentrant_mutex_; rclcpp::GuardCondition gc_; + std::string action_name_; + QoS qos_profile_; + std::recursive_mutex reentrant_mutex_; - // Action server on ready callbacks and unread count. - // These callbacks can be set by the user to be notified about new events - // on the action server like a new goal request, result request or a cancel goal request. - // These events have a counter associated with them, counting the amount of events - // that happened before having assigned a callback for them. - using EventTypeOnReadyCallback = std::function; - using CallbackUnreadCountPair = std::pair; + // Map the different action server event types to their unread count. + std::unordered_map event_type_to_unread_count_; - // Map the different action server event types to their callbacks and unread count. - std::unordered_map event_type_to_on_ready_callback_; + // Generic events callback + std::function on_ready_callback_{nullptr}; // Invoke the callback to be called when the action server has a new event void @@ -160,88 +166,21 @@ class ActionServerIntraProcessBase : public rclcpp::Waitable { std::lock_guard lock(reentrant_mutex_); - // Search for a callback for this event type - auto it = event_type_to_on_ready_callback_.find(event_type); - - if (it != event_type_to_on_ready_callback_.end()) { - auto & on_ready_callback = it->second.first; - // If there's a callback associated with this event type, call it - if (on_ready_callback) { - on_ready_callback(1); - } else { - // We don't have a callback for this event type yet, - // increase its event counter. - auto & event_type_unread_count = it->second.second; - event_type_unread_count++; - } - } else { - // No entries found for this event type, create one - // with an emtpy callback and one unread event. - event_type_to_on_ready_callback_.emplace(event_type, std::make_pair(nullptr, 1)); + if (on_ready_callback_) { + on_ready_callback_(1, static_cast(event_type)); + return; } - } -private: - std::string action_name_; - QoS qos_profile_; - - void set_callback_to_event_type( - EventType event_type, - std::function callback) - { - auto new_callback = create_event_type_callback(callback, event_type); - - std::lock_guard lock(reentrant_mutex_); - - // Check if we have already an entry for this event type - auto it = event_type_to_on_ready_callback_.find(event_type); - - if (it != event_type_to_on_ready_callback_.end()) { - // We have an entry for this event type, check how many - // events of this event type happened so far. - auto & event_type_unread_count = it->second.second; - if (event_type_unread_count) { - new_callback(event_type_unread_count); - } - event_type_unread_count = 0; - // Set the new callback for this event type - auto & event_type_on_ready_callback = it->second.first; - event_type_on_ready_callback = new_callback; + auto it = event_type_to_unread_count_.find(event_type); + if (it != event_type_to_unread_count_.end()) { + auto & unread_count = it->second; + // Entry exists, increment unread counter + unread_count++; } else { - // We had no entries for this event type, create one - // with the new callback and zero as unread count. - event_type_to_on_ready_callback_.emplace(event_type, std::make_pair(new_callback, 0)); + // Entry doesn't exist, create new with unread_count = 1 + event_type_to_unread_count_[event_type] = 1; } } - - std::function - create_event_type_callback( - std::function callback, - EventType event_type) - { - // Note: we bind the int identifier argument to this waitable's entity types - auto new_callback = - [callback, event_type, this](size_t number_of_events) { - try { - callback(number_of_events, static_cast(event_type)); - } catch (const std::exception & exception) { - RCLCPP_ERROR_STREAM( - rclcpp::get_logger("rclcpp_action"), - "rclcpp::experimental::ActionServerIntraProcessBase@" << this << - " caught " << rmw::impl::cpp::demangle(exception) << - " exception in user-provided callback for the 'on ready' callback: " << - exception.what()); - } catch (...) { - RCLCPP_ERROR_STREAM( - rclcpp::get_logger("rclcpp_action"), - "rclcpp::experimental::ActionServerIntraProcessBase@" << this << - " caught unhandled exception in user-provided callback " << - "for the 'on ready' callback"); - } - }; - - return new_callback; - } }; } // namespace experimental diff --git a/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp b/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp index ca8bf58a96..b23851ed69 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp @@ -62,7 +62,7 @@ class RingBufferImplementation : public BufferImplementationBase * * \param request the element to be stored in the ring buffer */ - void enqueue(BufferT request) + void enqueue(BufferT request) override { std::lock_guard lock(mutex_); @@ -82,7 +82,7 @@ class RingBufferImplementation : public BufferImplementationBase * * \return the element that is being removed from the ring buffer */ - BufferT dequeue() + BufferT dequeue() override { std::lock_guard lock(mutex_); @@ -128,7 +128,7 @@ class RingBufferImplementation : public BufferImplementationBase * * \return `true` if there is data and `false` otherwise */ - inline bool has_data() const + inline bool has_data() const override { std::lock_guard lock(mutex_); return has_data_(); @@ -147,7 +147,7 @@ class RingBufferImplementation : public BufferImplementationBase return is_full_(); } - void clear() {} + void clear() override {} private: /// Get the next index value for the ring buffer diff --git a/rclcpp/include/rclcpp/experimental/client_intra_process.hpp b/rclcpp/include/rclcpp/experimental/client_intra_process.hpp index fb30e6ee81..bf6129a3c9 100644 --- a/rclcpp/include/rclcpp/experimental/client_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/client_intra_process.hpp @@ -76,7 +76,7 @@ class ClientIntraProcess : public ClientIntraProcessBase virtual ~ClientIntraProcess() = default; bool - is_ready(rcl_wait_set_t * wait_set) + is_ready(rcl_wait_set_t * wait_set) override { (void) wait_set; return buffer_->has_data(); @@ -97,7 +97,7 @@ class ClientIntraProcess : public ClientIntraProcessBase return std::static_pointer_cast(data); } - void execute(std::shared_ptr & data) + void execute(std::shared_ptr & data) override { if (!data) { throw std::runtime_error("'data' is empty"); diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index 1b33dbce8b..400f04a0b4 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -499,7 +499,9 @@ class IntraProcessManager } } - throw std::runtime_error("No action clients match the specified ID."); + RCLCPP_WARN(rclcpp::get_logger("rclcpp"), + "No action clients match the specified ID: %p", ipc_action_client_id); + return nullptr; } /// Gets an ActionServerIntraProcess matching an intra-process action client ID @@ -517,7 +519,9 @@ class IntraProcessManager auto action_client_it = action_clients_to_servers_.find(ipc_action_client_id); if (action_client_it == action_clients_to_servers_.end()) { - throw std::runtime_error("No action clients match the specified ID."); + RCLCPP_WARN(rclcpp::get_logger("rclcpp"), + "No action servers match the specified ID: %p", ipc_action_client_id); + return nullptr; } uint64_t action_service_id = action_client_it->second; @@ -549,30 +553,26 @@ class IntraProcessManager * * \param ipc_action_client_id the id of the action client sending the goal request * \param goal_request the action client's goal request data. - * \param callback the callback to be called when the server sends the goal response + * \return `true` if valid server and intra-process send was successful. */ template - void + bool intra_process_action_send_goal_request( uint64_t ipc_action_client_id, RequestT goal_request, - std::function)> callback) + size_t goal_id) { - // First, lets store the client callback to be called when the - // server sends the goal response - auto client = get_intra_process_action_client(ipc_action_client_id); - - if (client) { - client->store_goal_response_callback(callback); - } - - // Now lets send the goal request - auto service = get_matching_intra_process_action_server(ipc_action_client_id); + auto server = get_matching_intra_process_action_server(ipc_action_client_id); - if (service) { - service->store_ipc_action_goal_request( + if (server) { + server->store_ipc_action_goal_request( ipc_action_client_id, std::move(goal_request)); + + std::unique_lock lock(mutex_); + clients_uuid_to_id_[goal_id] = ipc_action_client_id; + return true; } + return false; } /// Send an intra-process action client cancel request @@ -582,30 +582,22 @@ class IntraProcessManager * * \param ipc_action_client_id the id of the action client sending the cancel request * \param cancel_request the action client's cancel request data. - * \param callback the callback to be called when the server sends the cancel response + * \return `true` if valid server and intra-process send was successful. */ template - void + bool intra_process_action_send_cancel_request( uint64_t ipc_action_client_id, - CancelT cancel_request, - std::function)> callback) + CancelT cancel_request) { - // First, lets store the client callback to be called when the - // server sends the cancel response - auto client = get_intra_process_action_client(ipc_action_client_id); + auto server = get_matching_intra_process_action_server(ipc_action_client_id); - if (client) { - client->store_cancel_goal_callback(callback); - } - - // Now lets send the cancel request - auto service = get_matching_intra_process_action_server(ipc_action_client_id); - - if (service) { - service->store_ipc_action_cancel_request( + if (server) { + server->store_ipc_action_cancel_request( ipc_action_client_id, std::move(cancel_request)); + return true; } + return false; } /// Send an intra-process action client result request @@ -615,29 +607,22 @@ class IntraProcessManager * * \param ipc_action_client_id the id of the action client sending the result request * \param result_request the action client's result request data. + * \return `true` if valid server and intra-process send was successful. */ template - void + bool intra_process_action_send_result_request( uint64_t ipc_action_client_id, - RequestT result_request, - std::function)> callback) + RequestT result_request) { - // First, lets store the client callback to be called when the - // server sends the result response - auto client = get_intra_process_action_client(ipc_action_client_id); - - if (client) { - client->store_result_response_callback(callback); - } - - // Now lets send the result request to the server - auto service = get_matching_intra_process_action_server(ipc_action_client_id); + auto server = get_matching_intra_process_action_server(ipc_action_client_id); - if (service) { - service->store_ipc_action_result_request( + if (server) { + server->store_ipc_action_result_request( ipc_action_client_id, std::move(result_request)); + return true; } + return false; } /// Send an intra-process action server goal response @@ -647,17 +632,19 @@ class IntraProcessManager * * \param ipc_action_client_id the id of the action client receiving the response * \param goal_response the action server's goal response data. + * \param goal_id the Goal ID. */ template void intra_process_action_send_goal_response( uint64_t ipc_action_client_id, - ResponseT goal_response) + ResponseT goal_response, + size_t goal_id) { auto client = get_intra_process_action_client(ipc_action_client_id); if (client) { - client->store_ipc_action_goal_response(std::move(goal_response)); + client->store_ipc_action_goal_response(std::move(goal_response), goal_id); } } @@ -668,17 +655,19 @@ class IntraProcessManager * * \param ipc_action_client_id the id of the action client receiving the response * \param cancel_response the action server's cancel response data. + * \param goal_id the Goal ID. */ template void intra_process_action_send_cancel_response( uint64_t ipc_action_client_id, - ResponseT cancel_response) + ResponseT cancel_response, + size_t goal_id) { auto client = get_intra_process_action_client(ipc_action_client_id); if (client) { - client->store_ipc_action_cancel_response(std::move(cancel_response)); + client->store_ipc_action_cancel_response(std::move(cancel_response), goal_id); } } @@ -689,17 +678,19 @@ class IntraProcessManager * * \param ipc_action_client_id the id of the action client receiving the response * \param result_response the action server's result response data. + * \param goal_id the Goal ID. */ template void intra_process_action_send_result_response( uint64_t ipc_action_client_id, - ResponseT result_response) + ResponseT result_response, + size_t goal_id) { auto client = get_intra_process_action_client(ipc_action_client_id); if (client) { - client->store_ipc_action_result_response(std::move(result_response)); + client->store_ipc_action_result_response(std::move(result_response), goal_id); } } diff --git a/rclcpp/src/rclcpp/client.cpp b/rclcpp/src/rclcpp/client.cpp index c2a0612d68..d6cfe26b36 100644 --- a/rclcpp/src/rclcpp/client.cpp +++ b/rclcpp/src/rclcpp/client.cpp @@ -93,10 +93,13 @@ ClientBase::take_type_erased_response(void * response_out, rmw_request_id_t & re &request_header_out, response_out); if (RCL_RET_CLIENT_TAKE_FAILED == ret) { - RCLCPP_ERROR( - rclcpp::get_logger("rclcpp"), - "Error in take_type_erased_response: RCL_RET_CLIENT_TAKE_FAILED. " - "Service name: %s", get_service_name()); + // If we are here, the most common reason is due the client receiving a response + // meant for another client. + // Currently all clients with the same service name will get the server reponse. + // This impacts performances, since the service response is deserialized and + // discarded by clients receiving unwanted responses. + // + // We silently return here, issue tracked by https://github.com/ros2/rclcpp/issues/2397 return false; } else if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error(ret); diff --git a/rclcpp/src/rclcpp/intra_process_manager.cpp b/rclcpp/src/rclcpp/intra_process_manager.cpp index fd86d815ca..98cef7e4b6 100644 --- a/rclcpp/src/rclcpp/intra_process_manager.cpp +++ b/rclcpp/src/rclcpp/intra_process_manager.cpp @@ -79,7 +79,7 @@ IntraProcessManager::add_intra_process_client(ClientIntraProcessBase::SharedPtr } if (can_communicate(client, intra_process_service)) { uint64_t service_id = pair.first; - clients_to_services_.emplace(client_id, service_id); + clients_to_services_[client_id] = service_id; intra_process_service->add_intra_process_client(client, client_id); break; } @@ -121,7 +121,7 @@ IntraProcessManager::add_intra_process_service(ServiceIntraProcessBase::SharedPt } if (can_communicate(client, service)) { uint64_t client_id = pair.first; - clients_to_services_.emplace(client_id, service_id); + clients_to_services_[client_id] = service_id; service->add_intra_process_client(client, client_id); } @@ -147,7 +147,7 @@ IntraProcessManager::add_intra_process_action_client( } if (can_communicate(client, server)) { uint64_t server_id = pair.first; - action_clients_to_servers_.emplace(client_id, server_id); + action_clients_to_servers_[client_id] = server_id; break; } } @@ -187,23 +187,11 @@ IntraProcessManager::add_intra_process_action_server( } if (can_communicate(ipc_action_client, server)) { uint64_t client_id = pair.first; - action_clients_to_servers_.emplace(client_id, server_id); + action_clients_to_servers_[client_id] = server_id; } } - return server_id; -} -// Store an intra-process action client ID along its current -// goal UUID, since later when the server process a request -// it'll use the goal UUID to retrieve the client which asked for -// the result. -void -IntraProcessManager::store_intra_process_action_client_goal_uuid( - uint64_t ipc_action_client_id, - size_t uuid) -{ - std::unique_lock lock(mutex_); - clients_uuid_to_id_[uuid] = ipc_action_client_id; + return server_id; } // Remove an action client goal UUID entry @@ -229,12 +217,11 @@ IntraProcessManager::get_action_client_id_from_goal_uuid(size_t uuid) auto iter = clients_uuid_to_id_.find(uuid); - if (iter == clients_uuid_to_id_.end()) { - throw std::runtime_error( - "No ipc action clients stored with this UUID."); + if (iter != clients_uuid_to_id_.end()) { + return iter->second; } - return iter->second; + return 0; } void @@ -438,6 +425,9 @@ IntraProcessManager::get_action_client_intra_process( auto client_it = action_clients_.find(intra_process_action_client_id); if (client_it == action_clients_.end()) { + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "No action clients match the specified ID: %ld", intra_process_action_client_id); return nullptr; } else { auto client = client_it->second.lock(); @@ -445,6 +435,9 @@ IntraProcessManager::get_action_client_intra_process( return client; } else { action_clients_.erase(client_it); + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Action client out of scope. ID: %ld", intra_process_action_client_id); return nullptr; } } diff --git a/rclcpp_action/include/rclcpp_action/client.hpp b/rclcpp_action/include/rclcpp_action/client.hpp index bf9a47f140..fabdc44168 100644 --- a/rclcpp_action/include/rclcpp_action/client.hpp +++ b/rclcpp_action/include/rclcpp_action/client.hpp @@ -336,7 +336,6 @@ class ClientBase : public rclcpp::Waitable std::unordered_map> entity_type_to_on_ready_callback_; // Intra-process action client data fields - std::recursive_mutex ipc_mutex_; bool use_intra_process_{false}; IntraProcessManagerWeakPtr weak_ipm_; uint64_t ipc_action_client_id_; @@ -464,10 +463,11 @@ class Client : public ClientBase auto goal_request = std::make_shared(); goal_request->goal_id.uuid = this->generate_goal_id(); goal_request->goal = goal; + size_t hashed_guuid = std::hash()(goal_request->goal_id.uuid); // The callback to be called when server accepts the goal, using the server // response as argument. - auto callback = + auto goal_response_callback = [this, goal_request, options, promise](std::shared_ptr response) mutable { using GoalResponse = typename ActionT::Impl::SendGoalService::Response; @@ -486,7 +486,7 @@ class Client : public ClientBase std::shared_ptr goal_handle( new GoalHandle(goal_info, options.feedback_callback, options.result_callback)); { - std::lock_guard guard(goal_handles_mutex_); + std::lock_guard guard(goal_handles_mutex_); goal_handles_[goal_handle->get_goal_id()] = goal_handle; } promise->set_value(goal_handle); @@ -501,8 +501,6 @@ class Client : public ClientBase bool intra_process_send_done = false; - std::lock_guard lock(ipc_mutex_); - if (use_intra_process_) { auto ipm = weak_ipm_.lock(); if (!ipm) { @@ -515,11 +513,13 @@ class Client : public ClientBase // If there's not, we fall back into inter-process communication, since // the server might be available in another process or was configured to not use IPC. if (intra_process_server_available) { - ipm->intra_process_action_send_goal_request( - ipc_action_client_id_, - std::move(goal_request), - callback); - intra_process_send_done = true; + ipc_action_client_->store_goal_response_callback( + hashed_guuid, goal_response_callback); + + intra_process_send_done = ipm->intra_process_action_send_goal_request( + ipc_action_client_id_, + std::move(goal_request), + hashed_guuid); } } @@ -527,29 +527,37 @@ class Client : public ClientBase // Send inter-process goal request this->send_goal_request( std::static_pointer_cast(goal_request), - callback); + goal_response_callback); } - // TODO(jacobperron): Encapsulate into it's own function and - // consider exposing an option to disable this cleanup - // To prevent the list from growing out of control, forget about any goals - // with no more user references - { - std::lock_guard guard(goal_handles_mutex_); - auto goal_handle_it = goal_handles_.begin(); - while (goal_handle_it != goal_handles_.end()) { - if (!goal_handle_it->second.lock()) { - RCLCPP_DEBUG( - this->get_logger(), - "Dropping weak reference to goal handle during send_goal()"); - goal_handle_it = goal_handles_.erase(goal_handle_it); - } else { - ++goal_handle_it; + clear_expired_goals(); + + return future; + } + + // TODO(jacobperron): Consider exposing an option to disable this cleanup + // To prevent the list from growing out of control, forget about any goals + // with no more user references + void clear_expired_goals() + { + std::lock_guard guard(goal_handles_mutex_); + auto goal_handle_it = goal_handles_.begin(); + while (goal_handle_it != goal_handles_.end()) { + if (!goal_handle_it->second.lock()) { + size_t hashed_guuid = std::hash()(goal_handle_it->first); + + RCLCPP_DEBUG( + this->get_logger(), + "Dropping weak reference to goal handle during send_goal()"); + goal_handle_it = goal_handles_.erase(goal_handle_it); + + if (use_intra_process_) { + ipc_action_client_->erase_goal_info(hashed_guuid); } + } else { + ++goal_handle_it; } } - - return future; } /// Asynchronously get the result for an active goal. @@ -565,7 +573,7 @@ class Client : public ClientBase typename GoalHandle::SharedPtr goal_handle, ResultCallback result_callback = nullptr) { - std::lock_guard lock(goal_handles_mutex_); + std::lock_guard lock(goal_handles_mutex_); if (goal_handles_.count(goal_handle->get_goal_id()) == 0) { throw exceptions::UnknownGoalHandleError(); } @@ -600,7 +608,7 @@ class Client : public ClientBase typename GoalHandle::SharedPtr goal_handle, CancelCallback cancel_callback = nullptr) { - std::lock_guard lock(goal_handles_mutex_); + std::lock_guard lock(goal_handles_mutex_); if (goal_handles_.count(goal_handle->get_goal_id()) == 0) { throw exceptions::UnknownGoalHandleError(); } @@ -659,7 +667,7 @@ class Client : public ClientBase virtual ~Client() { - std::lock_guard guard(goal_handles_mutex_); + std::lock_guard guard(goal_handles_mutex_); auto it = goal_handles_.begin(); while (it != goal_handles_.end()) { typename GoalHandle::SharedPtr goal_handle = it->second.lock(); @@ -719,7 +727,7 @@ class Client : public ClientBase void handle_feedback_message(std::shared_ptr message) override { - std::lock_guard guard(goal_handles_mutex_); + std::lock_guard guard(goal_handles_mutex_); using FeedbackMessage = typename ActionT::Impl::FeedbackMessage; typename FeedbackMessage::SharedPtr feedback_message = std::static_pointer_cast(message); @@ -756,7 +764,7 @@ class Client : public ClientBase void handle_status_message(std::shared_ptr message) override { - std::lock_guard guard(goal_handles_mutex_); + std::lock_guard guard(goal_handles_mutex_); using GoalStatusMessage = typename ActionT::Impl::GoalStatusMessage; auto status_message = std::static_pointer_cast(message); for (const GoalStatus & status : status_message->status_list) { @@ -794,7 +802,7 @@ class Client : public ClientBase // The client callback to be called when server calculates the result, using the server // response as argument. - auto callback = + auto result_response_callback = [goal_handle, this](std::shared_ptr response) mutable { // Wrap the response in a struct with the fields a user cares about @@ -806,15 +814,13 @@ class Client : public ClientBase wrapped_result.goal_id = goal_handle->get_goal_id(); wrapped_result.code = static_cast(result_response->status); goal_handle->set_result(wrapped_result); - std::lock_guard lock(goal_handles_mutex_); + std::lock_guard lock(goal_handles_mutex_); goal_handles_.erase(goal_handle->get_goal_id()); }; try { bool intra_process_send_done = false; - std::lock_guard lock(ipc_mutex_); - if (use_intra_process_) { auto ipm = weak_ipm_.lock(); if (!ipm) { @@ -828,11 +834,13 @@ class Client : public ClientBase // If there's not, we fall back into inter-process communication, since // the server might be available in another process or was configured to not use IPC. if (intra_process_server_available) { - ipm->intra_process_action_send_result_request( - ipc_action_client_id_, - std::move(goal_result_request), - callback); - intra_process_send_done = true; + size_t hashed_guuid = std::hash()(goal_handle->get_goal_id()); + ipc_action_client_->store_result_response_callback( + hashed_guuid, result_response_callback); + + intra_process_send_done = ipm->intra_process_action_send_result_request( + ipc_action_client_id_, + std::move(goal_result_request)); } } @@ -840,7 +848,7 @@ class Client : public ClientBase // Send inter-process result request this->send_result_request( std::static_pointer_cast(goal_result_request), - callback); + result_response_callback); } } catch (rclcpp::exceptions::RCLError & ex) { // This will cause an exception when the user tries to access the result @@ -858,7 +866,7 @@ class Client : public ClientBase auto promise = std::make_shared>(); std::shared_future future(promise->get_future()); - auto callback = + auto cancel_goal_callback = [cancel_callback, promise](std::shared_ptr response) mutable { auto cancel_response = std::static_pointer_cast(response); @@ -870,8 +878,6 @@ class Client : public ClientBase bool intra_process_send_done = false; - std::lock_guard lock(ipc_mutex_); - if (use_intra_process_) { auto ipm = weak_ipm_.lock(); if (!ipm) { @@ -884,18 +890,20 @@ class Client : public ClientBase // If there's not, we fall back into inter-process communication, since // the server might be available in another process or was configured to not use IPC. if (intra_process_server_available) { - ipm->intra_process_action_send_cancel_request( - ipc_action_client_id_, - std::move(cancel_request), - callback); - intra_process_send_done = true; + size_t hashed_guuid = std::hash()(cancel_request->goal_info.goal_id.uuid); + ipc_action_client_->store_cancel_goal_callback( + hashed_guuid, cancel_goal_callback); + + intra_process_send_done = ipm->intra_process_action_send_cancel_request( + ipc_action_client_id_, + std::move(cancel_request)); } } if (!intra_process_send_done) { this->send_cancel_request( std::static_pointer_cast(cancel_request), - callback); + cancel_goal_callback); } return future; } @@ -939,12 +947,13 @@ class Client : public ClientBase } rcl_action_client_depth_t qos_history; - qos_history.goal_service_depth = options.goal_service_qos.history; - qos_history.result_service_depth = options.result_service_qos.history; - qos_history.cancel_service_depth = options.cancel_service_qos.history; - qos_history.feedback_topic_depth = options.feedback_topic_qos.history; - qos_history.status_topic_depth = options.status_topic_qos.history; + qos_history.goal_service_depth = options.goal_service_qos.depth; + qos_history.result_service_depth = options.result_service_qos.depth; + qos_history.cancel_service_depth = options.cancel_service_qos.depth; + qos_history.feedback_topic_depth = options.feedback_topic_qos.depth; + qos_history.status_topic_depth = options.status_topic_qos.depth; + // Get full action name, including namespaces. std::string remapped_action_name = node_base->resolve_topic_or_service_name(action_name, true); // Create a ActionClientIntraProcess which will be given @@ -955,7 +964,8 @@ class Client : public ClientBase remapped_action_name, qos_history, std::bind(&Client::handle_status_message, this, std::placeholders::_1), - std::bind(&Client::handle_feedback_message, this, std::placeholders::_1)); + std::bind(&Client::handle_feedback_message, this, std::placeholders::_1), + goal_handles_mutex_); // Add it to the intra process manager. using rclcpp::experimental::IntraProcessManager; @@ -965,7 +975,7 @@ class Client : public ClientBase } std::map goal_handles_; - std::mutex goal_handles_mutex_; + std::recursive_mutex goal_handles_mutex_; using ActionClientIntraProcessT = rclcpp::experimental::ActionClientIntraProcess; std::shared_ptr ipc_action_client_; diff --git a/rclcpp_action/include/rclcpp_action/server.hpp b/rclcpp_action/include/rclcpp_action/server.hpp index 54f25eee67..a15b859a9c 100644 --- a/rclcpp_action/include/rclcpp_action/server.hpp +++ b/rclcpp_action/include/rclcpp_action/server.hpp @@ -199,6 +199,17 @@ class ServerBase : public rclcpp::Waitable rclcpp::Waitable::SharedPtr get_intra_process_waitable(); + std::shared_ptr + lock_intra_process_manager() + { + auto ipm = weak_ipm_.lock(); + if (!ipm) { + throw std::runtime_error( + "Intra-process manager already destroyed"); + } + return ipm; + } + // End Waitables API // ----------------- @@ -315,19 +326,9 @@ class ServerBase : public rclcpp::Waitable std::shared_ptr process_cancel_request(rcl_action_cancel_request_t & cancel_request); - RCLCPP_ACTION_PUBLIC - std::shared_ptr - get_result_response(GoalUUID uuid); - // End API for communication between ServerBase and Server<> // --------------------------------------------------------- - // Intra-process action server data fields - std::recursive_mutex ipc_mutex_; - bool use_intra_process_{false}; - IntraProcessManagerWeakPtr weak_ipm_; - uint64_t ipc_action_server_id_; - private: /// Handle a request to add a new goal to the server /// \internal @@ -379,6 +380,11 @@ class ServerBase : public rclcpp::Waitable const void * user_data); bool on_ready_callback_set_{false}; + + // Intra-process action server data fields + bool use_intra_process_{false}; + IntraProcessManagerWeakPtr weak_ipm_; + uint64_t ipc_action_server_id_; }; /// Action Server @@ -483,20 +489,11 @@ class Server : public ServerBase, public std::enable_shared_from_thisremove_action_server(ipc_action_server_id_); } protected: - // Intra-process version of execute_goal_request_received_ - // Missing: Deep comparison of functionality betwen IPC on/off void ipc_execute_goal_request_received(GoalRequestDataPairSharedPtr data) { @@ -513,24 +510,12 @@ class Server : public ServerBase, public std::enable_shared_from_this(response_pair.second); - auto ipm = weak_ipm_.lock(); - if (!ipm) { - throw std::runtime_error( - "intra_process_action_send_goal_response called " - "after destruction of intra process manager"); - } - - // Here we store the uuid of the goal and associate it with a client - // so we can retrieve it when response is ready, or when sending feedback - // since the feedback calls only provide the goal UUID - // Store an entry - ipm->store_intra_process_action_client_goal_uuid( - intra_process_action_client_id, - std::hash()(uuid)); + auto ipm = lock_intra_process_manager(); ipm->intra_process_action_send_goal_response( intra_process_action_client_id, - std::move(goal_response)); + std::move(goal_response), + std::hash()(uuid)); const auto user_response = response_pair.first; @@ -564,21 +549,13 @@ class Server : public ServerBase, public std::enable_shared_from_thisfirst; CancelRequestSharedPtr request = data->second; - auto ipm = weak_ipm_.lock(); - if (!ipm) { - throw std::runtime_error( - "intra_process_action_send_cancel_response called " - " after destruction of intra process manager"); - } + auto ipm = lock_intra_process_manager(); // Convert c++ message to C message rcl_action_cancel_request_t cancel_request = rcl_action_get_zero_initialized_cancel_request(); @@ -597,14 +574,27 @@ class Server : public ServerBase, public std::enable_shared_from_thisgoal_info.goal_id.uuid; + ipm->intra_process_action_send_cancel_response( intra_process_action_client_id, - std::move(response)); + std::move(response), + std::hash()(uuid)); } + ResultResponseSharedPtr + get_result_response(size_t goal_id) + { + auto it = ipc_goal_results_.find(goal_id); + if (it != ipc_goal_results_.end()) { + auto result = it->second; + ipc_goal_results_.erase(it); + return result; + } + + return nullptr; + } - // Intra-process version of execute_result_request_received_ - // See if we can call the server.cpp version of it without doing rcl_action_send_result_response void ipc_execute_result_request_received(ResultRequestDataPairSharedPtr data) { @@ -613,31 +603,125 @@ class Server : public ServerBase, public std::enable_shared_from_this()(uuid); + + std::lock_guard lock(goal_handles_mutex_); - // This is a workaround, I have to find a place to have the - // result response stored somewhere. - std::shared_ptr result_response = this->get_result_response(uuid); + ResultResponseSharedPtr result_response = get_result_response(hashed_uuid); // Check if a result is already available. If not, it will // be sent when ready in the on_terminal_state callback below. if (result_response) { // Send the result now - auto ipm = weak_ipm_.lock(); - if (!ipm) { - throw std::runtime_error( - "intra_process_action_send_result_response called " - "after destruction of intra process manager"); - } + auto ipm = lock_intra_process_manager(); - auto typed_response = std::static_pointer_cast(result_response); ipm->intra_process_action_send_result_response( intra_process_action_client_id, - std::move(typed_response)); + std::move(result_response), + hashed_uuid); + } else { + goals_result_requested_.push_back(hashed_uuid); } } + bool + client_requested_response(size_t goal_id, ResultResponseSharedPtr typed_result) + { + std::lock_guard lock(goal_handles_mutex_); + + for (auto it = goals_result_requested_.begin(); it != goals_result_requested_.end(); ++it) { + if (*it == goal_id) { + goals_result_requested_.erase(it); + return true; + } + } + + // The cliend didn't ask fo response yet, so store it to send later + ipc_goal_results_[goal_id] = typed_result; + return false; + } + + bool + ipc_on_terminal_state(const GoalUUID & goal_uuid, std::shared_ptr result_message) + { + auto ipm = lock_intra_process_manager(); + + size_t hashed_uuid = std::hash()(goal_uuid); + + uint64_t ipc_action_client_id = ipm->get_action_client_id_from_goal_uuid(hashed_uuid); + + if (ipc_action_client_id) { + auto typed_result = std::static_pointer_cast(result_message); + + if (client_requested_response(hashed_uuid, typed_result)) { + ipm->template intra_process_action_send_result_response( + ipc_action_client_id, + std::move(typed_result), + hashed_uuid); + + auto status_msg = this->get_status_array(); + + ipm->template intra_process_action_publish_status( + ipc_action_client_id, + std::move(status_msg)); + + ipm->remove_intra_process_action_client_goal_uuid(hashed_uuid); + } + + return false; + } + + RCLCPP_DEBUG( + rclcpp::get_logger("rclcpp_action"), + "Action server can't send result response, missing IPC Action client: %s. " + "Will do inter-process publish", + this->action_name_); + + return true; + } + + bool + ipc_on_executing(const GoalUUID & goal_uuid) + { + auto ipm = lock_intra_process_manager(); + + size_t hashed_uuid = std::hash()(goal_uuid); + + uint64_t ipc_action_client_id = ipm->get_action_client_id_from_goal_uuid(hashed_uuid); + + if (ipc_action_client_id) { + // This part would be the IPC version of publish_status(); + auto status_msg = this->get_status_array(); + + ipm->template intra_process_action_publish_status( + ipc_action_client_id, + std::move(status_msg)); + return false; + } + + return true; + } + + + bool + ipc_publish_feedback(typename ActionT::Impl::FeedbackMessage::SharedPtr feedback_msg) + { + auto ipm = lock_intra_process_manager(); + + size_t hashed_uuid = std::hash()(feedback_msg->goal_id.uuid); + + uint64_t ipc_action_client_id = ipm->get_action_client_id_from_goal_uuid(hashed_uuid); + + if (ipc_action_client_id) { + ipm->template intra_process_action_publish_feedback( + ipc_action_client_id, + std::move(feedback_msg)); + return false; + } + + return true; + } + // ----------------------------------------------------- // API for communication between ServerBase and Server<> @@ -660,37 +744,11 @@ class Server : public ServerBase, public std::enable_shared_from_this ipc_lock(shared_this->ipc_mutex_); - + bool send_inter_process_needed = true; if (shared_this->use_intra_process_) { - auto ipm = shared_this->weak_ipm_.lock(); - if (!ipm) { - throw std::runtime_error( - "intra process send called after " - "destruction of intra process manager"); - } - - size_t hashed_uuid = std::hash()(goal_uuid); - - // This part would be the IPC version of publish_result(); - // It does not perform any checks, like if the goal exists - uint64_t ipc_action_client_id = - ipm->get_action_client_id_from_goal_uuid(hashed_uuid); - - auto typed_response = std::static_pointer_cast(result_message); - ipm->template intra_process_action_send_result_response( - ipc_action_client_id, - std::move(typed_response)); - - // This part would be the IPC version of publish_status(); - auto status_msg = shared_this->get_status_array(); - - ipm->template intra_process_action_publish_status( - ipc_action_client_id, - std::move(status_msg)); - - ipm->remove_intra_process_action_client_goal_uuid(hashed_uuid); - } else { + send_inter_process_needed = shared_this->ipc_on_terminal_state(goal_uuid, result_message); + } + if (send_inter_process_needed) { // Send result message to anyone that asked shared_this->publish_result(goal_uuid, result_message); // Publish a status message any time a goal handle changes state @@ -701,7 +759,7 @@ class Server : public ServerBase, public std::enable_shared_from_thisnotify_goal_terminal_state(); // Delete data now (ServerBase and rcl_action_server_t keep data until goal handle expires) - std::lock_guard lock(shared_this->goal_handles_mutex_); + std::lock_guard lock(shared_this->goal_handles_mutex_); shared_this->goal_handles_.erase(goal_uuid); }; @@ -713,64 +771,30 @@ class Server : public ServerBase, public std::enable_shared_from_this lock(shared_this->ipc_mutex_); - - // Publish a status message any time a goal handle changes state + bool send_inter_process_needed = true; if (shared_this->use_intra_process_) { - auto ipm = shared_this->weak_ipm_.lock(); - if (!ipm) { - throw std::runtime_error( - "intra_process_action_publish_status called " - "after destruction of intra process manager"); - } - - size_t hashed_uuid = std::hash()(goal_uuid); - - uint64_t ipc_action_client_id = - ipm->get_action_client_id_from_goal_uuid(hashed_uuid); - - // This part would be the IPC version of publish_status(); - auto status_msg = shared_this->get_status_array(); - - ipm->template intra_process_action_publish_status( - ipc_action_client_id, - std::move(status_msg)); - - } else { + send_inter_process_needed = shared_this->ipc_on_executing(goal_uuid); + } + if (send_inter_process_needed) { shared_this->publish_status(); } }; - - using FeedbackMsg = typename ActionT::Impl::FeedbackMessage; + using FeedbackMsg = typename ActionT::Impl::FeedbackMessage::SharedPtr; auto publish_feedback = - [weak_this](typename std::shared_ptr feedback_msg) + [weak_this](FeedbackMsg feedback_msg) { std::shared_ptr> shared_this = weak_this.lock(); if (!shared_this) { return; } - std::lock_guard lock(shared_this->ipc_mutex_); - + bool send_inter_process_needed = true; if (shared_this->use_intra_process_) { - auto ipm = shared_this->weak_ipm_.lock(); - if (!ipm) { - throw std::runtime_error( - "intra_process_action_publish_feedback called " - "after destruction of intra process manager"); - } - - size_t hashed_uuid = std::hash()(feedback_msg->goal_id.uuid); - - uint64_t ipc_action_client_id = - ipm->get_action_client_id_from_goal_uuid(hashed_uuid); - - ipm->template intra_process_action_publish_feedback( - ipc_action_client_id, - std::move(feedback_msg)); - } else { + send_inter_process_needed = shared_this->ipc_publish_feedback(feedback_msg); + } + if (send_inter_process_needed) { shared_this->publish_feedback(std::static_pointer_cast(feedback_msg)); } }; @@ -782,7 +806,7 @@ class Server : public ServerBase, public std::enable_shared_from_this( rcl_goal_handle, uuid, goal, on_terminal_state, on_executing, publish_feedback)); { - std::lock_guard lock(goal_handles_mutex_); + std::lock_guard lock(goal_handles_mutex_); goal_handles_[uuid] = goal_handle; } handle_accepted_(goal_handle); @@ -809,7 +833,7 @@ class Server : public ServerBase, public std::enable_shared_from_this> goal_handle; { - std::lock_guard lock(goal_handles_mutex_); + std::lock_guard lock(goal_handles_mutex_); auto element = goal_handles_.find(uuid); if (element != goal_handles_.end()) { goal_handle = element->second.lock(); @@ -884,13 +908,19 @@ class Server : public ServerBase, public std::enable_shared_from_this goal_handles_; - std::mutex goal_handles_mutex_; + std::recursive_mutex goal_handles_mutex_; + std::string action_name_; + // Declare the intra-process action server using ActionServerIntraProcessT = rclcpp::experimental::ActionServerIntraProcess; std::shared_ptr ipc_action_server_; - std::recursive_mutex ipc_mutex_; bool use_intra_process_{false}; + // Map to store results until the client request it + std::unordered_map ipc_goal_results_; + // The goals for which the client has requested the result + std::vector goals_result_requested_; + void create_intra_process_action_server( rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base, @@ -925,21 +955,21 @@ class Server : public ServerBase, public std::enable_shared_from_this lock(ipc_mutex_); + std::lock_guard lock(goal_handles_mutex_); use_intra_process_ = true; - std::string remapped_action_name = node_base->resolve_topic_or_service_name(name, true); + action_name_ = node_base->resolve_topic_or_service_name(name, true); // Create a ActionClientIntraProcess which will be given // to the intra-process manager. auto context = node_base->get_context(); ipc_action_server_ = std::make_shared( - context, remapped_action_name, qos_history, + context, action_name_, qos_history, std::bind(&Server::ipc_execute_goal_request_received, this, std::placeholders::_1), std::bind(&Server::ipc_execute_cancel_request_received, this, std::placeholders::_1), std::bind(&Server::ipc_execute_result_request_received, this, std::placeholders::_1)); diff --git a/rclcpp_action/include/rclcpp_action/types.hpp b/rclcpp_action/include/rclcpp_action/types.hpp index 4f8548ff76..c212044848 100644 --- a/rclcpp_action/include/rclcpp_action/types.hpp +++ b/rclcpp_action/include/rclcpp_action/types.hpp @@ -69,14 +69,13 @@ struct hash { size_t operator()(const rclcpp_action::GoalUUID & uuid) const noexcept { - // TODO(sloretz) Use someone else's hash function and cite it - size_t result = 0; - for (size_t i = 0; i < uuid.size(); ++i) { - for (size_t b = 0; b < sizeof(size_t); ++b) { - size_t part = uuid[i]; - part <<= CHAR_BIT * b; - result ^= part; - } + // Using the FNV-1a hash algorithm + constexpr size_t FNV_prime = 1099511628211u; + size_t result = 14695981039346656037u; + + for (const auto & byte : uuid) { + result ^= byte; + result *= FNV_prime; } return result; } diff --git a/rclcpp_action/src/client.cpp b/rclcpp_action/src/client.cpp index a2df68295e..16702dfe82 100644 --- a/rclcpp_action/src/client.cpp +++ b/rclcpp_action/src/client.cpp @@ -550,21 +550,7 @@ ClientBase::clear_on_ready_callback() std::shared_ptr ClientBase::take_data() { - if (pimpl_->is_feedback_ready) { - std::shared_ptr feedback_message = this->create_feedback_message(); - rcl_ret_t ret = rcl_action_take_feedback( - pimpl_->client_handle.get(), feedback_message.get()); - return std::static_pointer_cast( - std::make_shared>>( - ret, feedback_message)); - } else if (pimpl_->is_status_ready) { - std::shared_ptr status_message = this->create_status_message(); - rcl_ret_t ret = rcl_action_take_status( - pimpl_->client_handle.get(), status_message.get()); - return std::static_pointer_cast( - std::make_shared>>( - ret, status_message)); - } else if (pimpl_->is_goal_response_ready) { + if (pimpl_->is_goal_response_ready) { rmw_request_id_t response_header; std::shared_ptr goal_response = this->create_goal_response(); rcl_ret_t ret = rcl_action_take_goal_response( @@ -588,6 +574,20 @@ ClientBase::take_data() return std::static_pointer_cast( std::make_shared>>( ret, response_header, cancel_response)); + } else if (pimpl_->is_feedback_ready) { + std::shared_ptr feedback_message = this->create_feedback_message(); + rcl_ret_t ret = rcl_action_take_feedback( + pimpl_->client_handle.get(), feedback_message.get()); + return std::static_pointer_cast( + std::make_shared>>( + ret, feedback_message)); + } else if (pimpl_->is_status_ready) { + std::shared_ptr status_message = this->create_status_message(); + rcl_ret_t ret = rcl_action_take_status( + pimpl_->client_handle.get(), status_message.get()); + return std::static_pointer_cast( + std::make_shared>>( + ret, status_message)); } else { throw std::runtime_error("Taking data from action client but nothing is ready"); } @@ -625,27 +625,7 @@ ClientBase::execute(std::shared_ptr & data) throw std::runtime_error("'data' is empty"); } - if (pimpl_->is_feedback_ready) { - auto shared_ptr = std::static_pointer_cast>>(data); - auto ret = std::get<0>(*shared_ptr); - pimpl_->is_feedback_ready = false; - if (RCL_RET_OK == ret) { - auto feedback_message = std::get<1>(*shared_ptr); - this->handle_feedback_message(feedback_message); - } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) { - rclcpp::exceptions::throw_from_rcl_error(ret, "error taking feedback"); - } - } else if (pimpl_->is_status_ready) { - auto shared_ptr = std::static_pointer_cast>>(data); - auto ret = std::get<0>(*shared_ptr); - pimpl_->is_status_ready = false; - if (RCL_RET_OK == ret) { - auto status_message = std::get<1>(*shared_ptr); - this->handle_status_message(status_message); - } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) { - rclcpp::exceptions::throw_from_rcl_error(ret, "error taking status"); - } - } else if (pimpl_->is_goal_response_ready) { + if (pimpl_->is_goal_response_ready) { auto shared_ptr = std::static_pointer_cast< std::tuple>>(data); auto ret = std::get<0>(*shared_ptr); @@ -681,6 +661,26 @@ ClientBase::execute(std::shared_ptr & data) } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) { rclcpp::exceptions::throw_from_rcl_error(ret, "error taking cancel response"); } + } else if (pimpl_->is_feedback_ready) { + auto shared_ptr = std::static_pointer_cast>>(data); + auto ret = std::get<0>(*shared_ptr); + pimpl_->is_feedback_ready = false; + if (RCL_RET_OK == ret) { + auto feedback_message = std::get<1>(*shared_ptr); + this->handle_feedback_message(feedback_message); + } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) { + rclcpp::exceptions::throw_from_rcl_error(ret, "error taking feedback"); + } + } else if (pimpl_->is_status_ready) { + auto shared_ptr = std::static_pointer_cast>>(data); + auto ret = std::get<0>(*shared_ptr); + pimpl_->is_status_ready = false; + if (RCL_RET_OK == ret) { + auto status_message = std::get<1>(*shared_ptr); + this->handle_status_message(status_message); + } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) { + rclcpp::exceptions::throw_from_rcl_error(ret, "error taking status"); + } } else { throw std::runtime_error("Executing action client but nothing is ready"); } @@ -699,8 +699,6 @@ ClientBase::setup_intra_process( rclcpp::Waitable::SharedPtr ClientBase::get_intra_process_waitable() { - std::lock_guard lock(ipc_mutex_); - // If not using intra process, shortcut to nullptr. if (!use_intra_process_) { return nullptr; diff --git a/rclcpp_action/src/server.cpp b/rclcpp_action/src/server.cpp index c17d73b18e..c7c6f24778 100644 --- a/rclcpp_action/src/server.cpp +++ b/rclcpp_action/src/server.cpp @@ -569,18 +569,6 @@ ServerBase::execute_result_request_received(std::shared_ptr & data) data.reset(); } -// Todo: Use an intra-process way to store goal_results, when using IPC -std::shared_ptr -ServerBase::get_result_response(GoalUUID uuid) -{ - std::lock_guard lock(pimpl_->unordered_map_mutex_); - auto iter = pimpl_->goal_results_.find(uuid); - if (iter != pimpl_->goal_results_.end()) { - return iter->second; - } - return nullptr; -} - void ServerBase::execute_check_expired_goals() { @@ -889,8 +877,6 @@ ServerBase::setup_intra_process( rclcpp::Waitable::SharedPtr ServerBase::get_intra_process_waitable() { - std::lock_guard lock(ipc_mutex_); - // If not using intra process, shortcut to nullptr. if (!use_intra_process_) { return nullptr; diff --git a/rclcpp_action/test/test_types.cpp b/rclcpp_action/test/test_types.cpp index 7c652aaad6..6a88cd4883 100644 --- a/rclcpp_action/test/test_types.cpp +++ b/rclcpp_action/test/test_types.cpp @@ -15,6 +15,7 @@ #include #include +#include #include "rclcpp_action/types.hpp" TEST(TestActionTypes, goal_uuid_to_string) { @@ -59,3 +60,35 @@ TEST(TestActionTypes, rcl_action_goal_info_to_goal_uuid) { EXPECT_EQ(goal_info.goal_id.uuid[i], goal_id[i]); } } + +TEST(TestActionTypes, goal_uuid_to_hashed_uuid_random) { + // Use std::random_device to seed the generator of goal IDs. + std::random_device rd; + std::independent_bits_engine< + std::default_random_engine, 8, decltype(rd())> random_bytes_generator(rd()); + + std::vector hashed_guuids; + constexpr size_t iterations = 1000; + + for (size_t i = 0; i < iterations; i++) { + rclcpp_action::GoalUUID goal_id; + + // Generate random bytes for each element of the array + for (auto & element : goal_id) { + element = static_cast(random_bytes_generator()); + } + + size_t new_hashed_guuid = std::hash()(goal_id); + + // Search for any prevoius hashed goal_id with the same value + for (auto prev_hashed_guuid : hashed_guuids) { + EXPECT_NE(prev_hashed_guuid, new_hashed_guuid); + if (prev_hashed_guuid == new_hashed_guuid) { + // Fail before the first occurrence of a collision + GTEST_FAIL(); + } + } + + hashed_guuids.push_back(new_hashed_guuid); + } +}