3737
3838#include " rclcpp/clock.hpp"
3939#include " rclcpp/detail/cpp_callback_trampoline.hpp"
40+ #include " rclcpp/detail/resolve_use_intra_process.hpp"
4041#include " rclcpp/exceptions.hpp"
4142#include " rclcpp/expand_topic_or_service_name.hpp"
43+ #include " rclcpp/experimental/client_intra_process.hpp"
44+ #include " rclcpp/experimental/intra_process_manager.hpp"
4245#include " rclcpp/function_traits.hpp"
46+ #include " rclcpp/intra_process_setting.hpp"
4347#include " rclcpp/logging.hpp"
4448#include " rclcpp/macros.hpp"
4549#include " rclcpp/node_interfaces/node_graph_interface.hpp"
4852#include " rclcpp/utilities.hpp"
4953#include " rclcpp/visibility_control.hpp"
5054
55+ #include " rcutils/logging_macros.h"
56+
5157#include " rmw/error_handling.h"
5258#include " rmw/impl/cpp/demangle.hpp"
5359#include " rmw/rmw.h"
@@ -277,6 +283,15 @@ class ClientBase
277283 rclcpp::QoS
278284 get_response_subscription_actual_qos () const ;
279285
286+ // / Return the waitable for intra-process
287+ /* *
288+ * \return the waitable sharedpointer for intra-process, or nullptr if intra-process is not setup.
289+ * \throws std::runtime_error if the intra process manager is destroyed
290+ */
291+ RCLCPP_PUBLIC
292+ rclcpp::Waitable::SharedPtr
293+ get_intra_process_waitable ();
294+
280295 // / Set a callback to be called when each new response is received.
281296 /* *
282297 * The callback receives a size_t which is the number of responses received
@@ -381,6 +396,19 @@ class ClientBase
381396 void
382397 set_on_new_response_callback (rcl_event_callback_t callback, const void * user_data);
383398
399+ using IntraProcessManagerWeakPtr =
400+ std::weak_ptr<rclcpp::experimental::IntraProcessManager>;
401+
402+ // / Implementation detail.
403+ RCLCPP_PUBLIC
404+ void
405+ setup_intra_process (
406+ uint64_t intra_process_client_id,
407+ IntraProcessManagerWeakPtr weak_ipm);
408+
409+ std::shared_ptr<rclcpp::experimental::ClientIntraProcessBase> client_intra_process_;
410+ std::atomic_uint ipc_sequence_number_{1 };
411+
384412 rclcpp::node_interfaces::NodeGraphInterface::WeakPtr node_graph_;
385413 std::shared_ptr<rcl_node_t > node_handle_;
386414 std::shared_ptr<rclcpp::Context> context_;
@@ -396,6 +424,11 @@ class ClientBase
396424 std::shared_ptr<rcl_client_t > client_handle_;
397425
398426 std::atomic<bool > in_use_by_wait_set_{false };
427+
428+ std::recursive_mutex ipc_mutex_;
429+ bool use_intra_process_{false };
430+ IntraProcessManagerWeakPtr weak_ipm_;
431+ uint64_t intra_process_client_id_;
399432};
400433
401434template <typename ServiceT>
@@ -491,12 +524,14 @@ class Client : public ClientBase
491524 * \param[in] node_graph The node graph interface of the corresponding node.
492525 * \param[in] service_name Name of the topic to publish to.
493526 * \param[in] client_options options for the subscription.
527+ * \param[in] ipc_setting Intra-process communication setting for the client.
494528 */
495529 Client (
496530 rclcpp::node_interfaces::NodeBaseInterface * node_base,
497531 rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph,
498532 const std::string & service_name,
499- rcl_client_options_t & client_options)
533+ rcl_client_options_t & client_options,
534+ rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault)
500535 : ClientBase(node_base, node_graph),
501536 srv_type_support_handle_ (rosidl_typesupport_cpp::get_service_type_support_handle<ServiceT>())
502537 {
@@ -519,10 +554,27 @@ class Client : public ClientBase
519554 }
520555 rclcpp::exceptions::throw_from_rcl_error (ret, " could not create client" );
521556 }
557+
558+ // Setup intra process if requested.
559+ if (rclcpp::detail::resolve_use_intra_process (ipc_setting, *node_base)) {
560+ create_intra_process_client ();
561+ }
522562 }
523563
524564 virtual ~Client ()
525565 {
566+ if (!use_intra_process_) {
567+ return ;
568+ }
569+ auto ipm = weak_ipm_.lock ();
570+ if (!ipm) {
571+ // TODO(ivanpauno): should this raise an error?
572+ RCLCPP_WARN (
573+ rclcpp::get_logger (" rclcpp" ),
574+ " Intra process manager died before than a client." );
575+ return ;
576+ }
577+ ipm->remove_client (intra_process_client_id_);
526578 }
527579
528580 // / Take the next response for this client.
@@ -639,7 +691,7 @@ class Client : public ClientBase
639691 Promise promise;
640692 auto future = promise.get_future ();
641693 auto req_id = async_send_request_impl (
642- * request,
694+ std::move ( request) ,
643695 std::move (promise));
644696 return FutureAndRequestId (std::move (future), req_id);
645697 }
@@ -674,7 +726,7 @@ class Client : public ClientBase
674726 Promise promise;
675727 auto shared_future = promise.get_future ().share ();
676728 auto req_id = async_send_request_impl (
677- * request,
729+ std::move ( request) ,
678730 std::make_tuple (
679731 CallbackType{std::forward<CallbackT>(cb)},
680732 shared_future,
@@ -705,7 +757,7 @@ class Client : public ClientBase
705757 PromiseWithRequest promise;
706758 auto shared_future = promise.get_future ().share ();
707759 auto req_id = async_send_request_impl (
708- * request,
760+ request,
709761 std::make_tuple (
710762 CallbackWithRequestType{std::forward<CallbackT>(cb)},
711763 request,
@@ -839,11 +891,33 @@ class Client : public ClientBase
839891 CallbackWithRequestTypeValueVariant>;
840892
841893 int64_t
842- async_send_request_impl (const Request & request, CallbackInfoVariant value)
894+ async_send_request_impl (SharedRequest request, CallbackInfoVariant value)
843895 {
896+ std::lock_guard<std::recursive_mutex> ipc_lock (ipc_mutex_);
897+ if (use_intra_process_) {
898+ auto ipm = weak_ipm_.lock ();
899+ if (!ipm) {
900+ throw std::runtime_error (
901+ " intra process send called after destruction of intra process manager" );
902+ }
903+ bool intra_process_server_available = ipm->service_is_available (intra_process_client_id_);
904+
905+ // Check if there's an intra-process server available matching this client.
906+ // If there's not, we fall back into inter-process communication, since
907+ // the server might be available in another process or was configured to not use IPC.
908+ if (intra_process_server_available) {
909+ // Send intra-process request
910+ ipm->send_intra_process_client_request <ServiceT>(
911+ intra_process_client_id_,
912+ std::make_pair (std::move (request), std::move (value)));
913+ return ipc_sequence_number_++;
914+ }
915+ }
916+
917+ // Send inter-process request
844918 int64_t sequence_number;
845919 std::lock_guard<std::mutex> lock (pending_requests_mutex_);
846- rcl_ret_t ret = rcl_send_request (get_client_handle ().get (), & request, &sequence_number);
920+ rcl_ret_t ret = rcl_send_request (get_client_handle ().get (), request. get () , &sequence_number);
847921 if (RCL_RET_OK != ret) {
848922 rclcpp::exceptions::throw_from_rcl_error (ret, " failed to send request" );
849923 }
@@ -869,6 +943,40 @@ class Client : public ClientBase
869943 return value;
870944 }
871945
946+ void
947+ create_intra_process_client ()
948+ {
949+ // Check if the QoS is compatible with intra-process.
950+ auto qos_profile = get_response_subscription_actual_qos ();
951+
952+ if (qos_profile.history () != rclcpp::HistoryPolicy::KeepLast) {
953+ throw std::invalid_argument (
954+ " intraprocess communication allowed only with keep last history qos policy" );
955+ }
956+ if (qos_profile.depth () == 0 ) {
957+ throw std::invalid_argument (
958+ " intraprocess communication is not allowed with 0 depth qos policy" );
959+ }
960+ if (qos_profile.durability () != rclcpp::DurabilityPolicy::Volatile) {
961+ throw std::invalid_argument (
962+ " intraprocess communication allowed only with volatile durability" );
963+ }
964+
965+ // Create a ClientIntraProcess which will be given to the intra-process manager.
966+ using ClientIntraProcessT = rclcpp::experimental::ClientIntraProcess<ServiceT>;
967+
968+ client_intra_process_ = std::make_shared<ClientIntraProcessT>(
969+ context_,
970+ this ->get_service_name (),
971+ qos_profile);
972+
973+ // Add it to the intra process manager.
974+ using rclcpp::experimental::IntraProcessManager;
975+ auto ipm = context_->get_sub_context <IntraProcessManager>();
976+ uint64_t intra_process_client_id = ipm->add_intra_process_client (client_intra_process_);
977+ this ->setup_intra_process (intra_process_client_id, ipm);
978+ }
979+
872980 RCLCPP_DISABLE_COPY (Client)
873981
874982 std::unordered_map<
0 commit comments