3737
3838#include " rclcpp/clock.hpp"
3939#include " rclcpp/detail/cpp_callback_trampoline.hpp"
40+ #include " rclcpp/detail/resolve_use_intra_process.hpp"
4041#include " rclcpp/exceptions.hpp"
4142#include " rclcpp/expand_topic_or_service_name.hpp"
43+ #include " rclcpp/experimental/client_intra_process.hpp"
44+ #include " rclcpp/experimental/intra_process_manager.hpp"
4245#include " rclcpp/function_traits.hpp"
46+ #include " rclcpp/intra_process_setting.hpp"
4347#include " rclcpp/logging.hpp"
4448#include " rclcpp/macros.hpp"
4549#include " rclcpp/node_interfaces/node_graph_interface.hpp"
4852#include " rclcpp/utilities.hpp"
4953#include " rclcpp/visibility_control.hpp"
5054
55+ #include " rcutils/logging_macros.h"
56+
5157#include " rmw/error_handling.h"
5258#include " rmw/impl/cpp/demangle.hpp"
5359#include " rmw/rmw.h"
@@ -254,6 +260,15 @@ class ClientBase
254260 rclcpp::QoS
255261 get_response_subscription_actual_qos () const ;
256262
263+ // / Return the waitable for intra-process
264+ /* *
265+ * \return the waitable sharedpointer for intra-process, or nullptr if intra-process is not setup.
266+ * \throws std::runtime_error if the intra process manager is destroyed
267+ */
268+ RCLCPP_PUBLIC
269+ rclcpp::Waitable::SharedPtr
270+ get_intra_process_waitable ();
271+
257272 // / Set a callback to be called when each new response is received.
258273 /* *
259274 * The callback receives a size_t which is the number of responses received
@@ -358,6 +373,19 @@ class ClientBase
358373 void
359374 set_on_new_response_callback (rcl_event_callback_t callback, const void * user_data);
360375
376+ using IntraProcessManagerWeakPtr =
377+ std::weak_ptr<rclcpp::experimental::IntraProcessManager>;
378+
379+ // / Implementation detail.
380+ RCLCPP_PUBLIC
381+ void
382+ setup_intra_process (
383+ uint64_t intra_process_client_id,
384+ IntraProcessManagerWeakPtr weak_ipm);
385+
386+ std::shared_ptr<rclcpp::experimental::ClientIntraProcessBase> client_intra_process_;
387+ std::atomic_uint ipc_sequence_number_{1 };
388+
361389 rclcpp::node_interfaces::NodeGraphInterface::WeakPtr node_graph_;
362390 std::shared_ptr<rcl_node_t > node_handle_;
363391 std::shared_ptr<rclcpp::Context> context_;
@@ -373,6 +401,11 @@ class ClientBase
373401 std::shared_ptr<rcl_client_t > client_handle_;
374402
375403 std::atomic<bool > in_use_by_wait_set_{false };
404+
405+ std::recursive_mutex ipc_mutex_;
406+ bool use_intra_process_{false };
407+ IntraProcessManagerWeakPtr weak_ipm_;
408+ uint64_t intra_process_client_id_;
376409};
377410
378411template <typename ServiceT>
@@ -468,12 +501,14 @@ class Client : public ClientBase
468501 * \param[in] node_graph The node graph interface of the corresponding node.
469502 * \param[in] service_name Name of the topic to publish to.
470503 * \param[in] client_options options for the subscription.
504+ * \param[in] ipc_setting Intra-process communication setting for the client.
471505 */
472506 Client (
473507 rclcpp::node_interfaces::NodeBaseInterface * node_base,
474508 rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph,
475509 const std::string & service_name,
476- rcl_client_options_t & client_options)
510+ rcl_client_options_t & client_options,
511+ rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault)
477512 : ClientBase(node_base, node_graph),
478513 srv_type_support_handle_ (rosidl_typesupport_cpp::get_service_type_support_handle<ServiceT>())
479514 {
@@ -496,10 +531,27 @@ class Client : public ClientBase
496531 }
497532 rclcpp::exceptions::throw_from_rcl_error (ret, " could not create client" );
498533 }
534+
535+ // Setup intra process if requested.
536+ if (rclcpp::detail::resolve_use_intra_process (ipc_setting, *node_base)) {
537+ create_intra_process_client ();
538+ }
499539 }
500540
501541 virtual ~Client ()
502542 {
543+ if (!use_intra_process_) {
544+ return ;
545+ }
546+ auto ipm = weak_ipm_.lock ();
547+ if (!ipm) {
548+ // TODO(ivanpauno): should this raise an error?
549+ RCLCPP_WARN (
550+ rclcpp::get_logger (" rclcpp" ),
551+ " Intra process manager died before than a client." );
552+ return ;
553+ }
554+ ipm->remove_client (intra_process_client_id_);
503555 }
504556
505557 // / Take the next response for this client.
@@ -616,7 +668,7 @@ class Client : public ClientBase
616668 Promise promise;
617669 auto future = promise.get_future ();
618670 auto req_id = async_send_request_impl (
619- * request,
671+ std::move ( request) ,
620672 std::move (promise));
621673 return FutureAndRequestId (std::move (future), req_id);
622674 }
@@ -651,7 +703,7 @@ class Client : public ClientBase
651703 Promise promise;
652704 auto shared_future = promise.get_future ().share ();
653705 auto req_id = async_send_request_impl (
654- * request,
706+ std::move ( request) ,
655707 std::make_tuple (
656708 CallbackType{std::forward<CallbackT>(cb)},
657709 shared_future,
@@ -682,7 +734,7 @@ class Client : public ClientBase
682734 PromiseWithRequest promise;
683735 auto shared_future = promise.get_future ().share ();
684736 auto req_id = async_send_request_impl (
685- * request,
737+ request,
686738 std::make_tuple (
687739 CallbackWithRequestType{std::forward<CallbackT>(cb)},
688740 request,
@@ -824,11 +876,33 @@ class Client : public ClientBase
824876 CallbackWithRequestTypeValueVariant>;
825877
826878 int64_t
827- async_send_request_impl (const Request & request, CallbackInfoVariant value)
879+ async_send_request_impl (SharedRequest request, CallbackInfoVariant value)
828880 {
881+ std::lock_guard<std::recursive_mutex> ipc_lock (ipc_mutex_);
882+ if (use_intra_process_) {
883+ auto ipm = weak_ipm_.lock ();
884+ if (!ipm) {
885+ throw std::runtime_error (
886+ " intra process send called after destruction of intra process manager" );
887+ }
888+ bool intra_process_server_available = ipm->service_is_available (intra_process_client_id_);
889+
890+ // Check if there's an intra-process server available matching this client.
891+ // If there's not, we fall back into inter-process communication, since
892+ // the server might be available in another process or was configured to not use IPC.
893+ if (intra_process_server_available) {
894+ // Send intra-process request
895+ ipm->send_intra_process_client_request <ServiceT>(
896+ intra_process_client_id_,
897+ std::make_pair (std::move (request), std::move (value)));
898+ return ipc_sequence_number_++;
899+ }
900+ }
901+
902+ // Send inter-process request
829903 int64_t sequence_number;
830904 std::lock_guard<std::mutex> lock (pending_requests_mutex_);
831- rcl_ret_t ret = rcl_send_request (get_client_handle ().get (), & request, &sequence_number);
905+ rcl_ret_t ret = rcl_send_request (get_client_handle ().get (), request. get () , &sequence_number);
832906 if (RCL_RET_OK != ret) {
833907 rclcpp::exceptions::throw_from_rcl_error (ret, " failed to send request" );
834908 }
@@ -854,6 +928,40 @@ class Client : public ClientBase
854928 return value;
855929 }
856930
931+ void
932+ create_intra_process_client ()
933+ {
934+ // Check if the QoS is compatible with intra-process.
935+ auto qos_profile = get_response_subscription_actual_qos ();
936+
937+ if (qos_profile.history () != rclcpp::HistoryPolicy::KeepLast) {
938+ throw std::invalid_argument (
939+ " intraprocess communication allowed only with keep last history qos policy" );
940+ }
941+ if (qos_profile.depth () == 0 ) {
942+ throw std::invalid_argument (
943+ " intraprocess communication is not allowed with 0 depth qos policy" );
944+ }
945+ if (qos_profile.durability () != rclcpp::DurabilityPolicy::Volatile) {
946+ throw std::invalid_argument (
947+ " intraprocess communication allowed only with volatile durability" );
948+ }
949+
950+ // Create a ClientIntraProcess which will be given to the intra-process manager.
951+ using ClientIntraProcessT = rclcpp::experimental::ClientIntraProcess<ServiceT>;
952+
953+ client_intra_process_ = std::make_shared<ClientIntraProcessT>(
954+ context_,
955+ this ->get_service_name (),
956+ qos_profile);
957+
958+ // Add it to the intra process manager.
959+ using rclcpp::experimental::IntraProcessManager;
960+ auto ipm = context_->get_sub_context <IntraProcessManager>();
961+ uint64_t intra_process_client_id = ipm->add_intra_process_client (client_intra_process_);
962+ this ->setup_intra_process (intra_process_client_id, ipm);
963+ }
964+
857965 RCLCPP_DISABLE_COPY (Client)
858966
859967 std::unordered_map<
0 commit comments