Skip to content

Commit 17a8555

Browse files
author
Mauro Passerino
committed
Support intra-process communication: Clients & Services
Signed-off-by: Mauro Passerino <[email protected]>
1 parent 0f58bb8 commit 17a8555

29 files changed

+1370
-63
lines changed

rclcpp/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ set(${PROJECT_NAME}_SRCS
3838
src/rclcpp/any_executable.cpp
3939
src/rclcpp/callback_group.cpp
4040
src/rclcpp/client.cpp
41+
src/rclcpp/client_intra_process_base.cpp
4142
src/rclcpp/clock.cpp
4243
src/rclcpp/context.cpp
4344
src/rclcpp/contexts/default_context.cpp
@@ -97,6 +98,7 @@ set(${PROJECT_NAME}_SRCS
9798
src/rclcpp/serialization.cpp
9899
src/rclcpp/serialized_message.cpp
99100
src/rclcpp/service.cpp
101+
src/rclcpp/service_intra_process_base.cpp
100102
src/rclcpp/signal_handler.cpp
101103
src/rclcpp/subscription_base.cpp
102104
src/rclcpp/subscription_intra_process_base.cpp

rclcpp/include/rclcpp/client.hpp

Lines changed: 93 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@
3636
#include "rclcpp/detail/cpp_callback_trampoline.hpp"
3737
#include "rclcpp/exceptions.hpp"
3838
#include "rclcpp/expand_topic_or_service_name.hpp"
39+
#include "rclcpp/experimental/client_intra_process.hpp"
40+
#include "rclcpp/experimental/intra_process_manager.hpp"
3941
#include "rclcpp/function_traits.hpp"
42+
#include "rclcpp/intra_process_setting.hpp"
4043
#include "rclcpp/logging.hpp"
4144
#include "rclcpp/macros.hpp"
4245
#include "rclcpp/node_interfaces/node_graph_interface.hpp"
@@ -45,6 +48,8 @@
4548
#include "rclcpp/utilities.hpp"
4649
#include "rclcpp/visibility_control.hpp"
4750

51+
#include "rcutils/logging_macros.h"
52+
4853
#include "rmw/error_handling.h"
4954
#include "rmw/impl/cpp/demangle.hpp"
5055
#include "rmw/rmw.h"
@@ -251,6 +256,25 @@ class ClientBase
251256
rclcpp::QoS
252257
get_response_subscription_actual_qos() const;
253258

259+
using IntraProcessManagerWeakPtr =
260+
std::weak_ptr<rclcpp::experimental::IntraProcessManager>;
261+
262+
/// Implementation detail.
263+
RCLCPP_PUBLIC
264+
void
265+
setup_intra_process(
266+
uint64_t intra_process_client_id,
267+
IntraProcessManagerWeakPtr weak_ipm);
268+
269+
/// Return the waitable for intra-process
270+
/**
271+
* \return the waitable sharedpointer for intra-process, or nullptr if intra-process is not setup.
272+
* \throws std::runtime_error if the intra process manager is destroyed
273+
*/
274+
RCLCPP_PUBLIC
275+
rclcpp::Waitable::SharedPtr
276+
get_intra_process_waitable() const;
277+
254278
/// Set a callback to be called when each new response is received.
255279
/**
256280
* The callback receives a size_t which is the number of responses received
@@ -363,6 +387,10 @@ class ClientBase
363387

364388
std::atomic<bool> in_use_by_wait_set_{false};
365389

390+
bool use_intra_process_{false};
391+
IntraProcessManagerWeakPtr weak_ipm_;
392+
uint64_t intra_process_client_id_;
393+
366394
std::recursive_mutex callback_mutex_;
367395
std::function<void(size_t)> on_new_response_callback_{nullptr};
368396
};
@@ -460,12 +488,14 @@ class Client : public ClientBase
460488
* \param[in] node_graph The node graph interface of the corresponding node.
461489
* \param[in] service_name Name of the topic to publish to.
462490
* \param[in] client_options options for the subscription.
491+
* \param[in] ipc_setting Intra-process communication setting for the client.
463492
*/
464493
Client(
465494
rclcpp::node_interfaces::NodeBaseInterface * node_base,
466495
rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph,
467496
const std::string & service_name,
468-
rcl_client_options_t & client_options)
497+
rcl_client_options_t & client_options,
498+
rclcpp::IntraProcessSetting ipc_setting)
469499
: ClientBase(node_base, node_graph)
470500
{
471501
using rosidl_typesupport_cpp::get_service_type_support_handle;
@@ -490,6 +520,37 @@ class Client : public ClientBase
490520
}
491521
rclcpp::exceptions::throw_from_rcl_error(ret, "could not create client");
492522
}
523+
524+
// Setup intra process if requested.
525+
if (ipc_setting == IntraProcessSetting::Enable) {
526+
// Check if the QoS is compatible with intra-process.
527+
auto qos_profile = get_response_subscription_actual_qos();
528+
529+
if (qos_profile.history() != rclcpp::HistoryPolicy::KeepLast) {
530+
throw std::invalid_argument(
531+
"intraprocess communication allowed only with keep last history qos policy");
532+
}
533+
if (qos_profile.depth() == 0) {
534+
throw std::invalid_argument(
535+
"intraprocess communication is not allowed with 0 depth qos policy");
536+
}
537+
if (qos_profile.durability() != rclcpp::DurabilityPolicy::Volatile) {
538+
throw std::invalid_argument(
539+
"intraprocess communication allowed only with volatile durability");
540+
}
541+
542+
// Create a ClientIntraProcess which will be given to the intra-process manager.
543+
client_intra_process_ = std::make_shared<ClientIntraProcessT>(
544+
context_,
545+
this->get_service_name(),
546+
qos_profile);
547+
548+
// Add it to the intra process manager.
549+
using rclcpp::experimental::IntraProcessManager;
550+
auto ipm = context_->get_sub_context<IntraProcessManager>();
551+
uint64_t intra_process_client_id = ipm->add_intra_process_client(client_intra_process_);
552+
this->setup_intra_process(intra_process_client_id, ipm);
553+
}
493554
}
494555

495556
virtual ~Client()
@@ -610,7 +671,7 @@ class Client : public ClientBase
610671
Promise promise;
611672
auto future = promise.get_future();
612673
auto req_id = async_send_request_impl(
613-
*request,
674+
std::move(request),
614675
std::move(promise));
615676
return FutureAndRequestId(std::move(future), req_id);
616677
}
@@ -645,7 +706,7 @@ class Client : public ClientBase
645706
Promise promise;
646707
auto shared_future = promise.get_future().share();
647708
auto req_id = async_send_request_impl(
648-
*request,
709+
std::move(request),
649710
std::make_tuple(
650711
CallbackType{std::forward<CallbackT>(cb)},
651712
shared_future,
@@ -676,7 +737,7 @@ class Client : public ClientBase
676737
PromiseWithRequest promise;
677738
auto shared_future = promise.get_future().share();
678739
auto req_id = async_send_request_impl(
679-
*request,
740+
request,
680741
std::make_tuple(
681742
CallbackWithRequestType{std::forward<CallbackT>(cb)},
682743
request,
@@ -789,10 +850,31 @@ class Client : public ClientBase
789850
CallbackWithRequestTypeValueVariant>;
790851

791852
int64_t
792-
async_send_request_impl(const Request & request, CallbackInfoVariant value)
853+
async_send_request_impl(SharedRequest request, CallbackInfoVariant value)
793854
{
855+
if (use_intra_process_) {
856+
auto ipm = weak_ipm_.lock();
857+
if (!ipm) {
858+
throw std::runtime_error(
859+
"intra process send called after destruction of intra process manager");
860+
}
861+
bool intra_process_server_available = ipm->service_is_available(intra_process_client_id_);
862+
863+
// Check if there's an intra-process server available matching this client.
864+
// If there's not, we fall back into inter-process communication, since
865+
// the server might be available in another process or was configured to not use IPC.
866+
if (intra_process_server_available) {
867+
// Send intra-process request
868+
ipm->send_intra_process_client_request<ServiceT>(
869+
intra_process_client_id_,
870+
std::make_pair(std::move(request), std::move(value)));
871+
return ipc_sequence_number_++;
872+
}
873+
}
874+
875+
// Send inter-process request
794876
int64_t sequence_number;
795-
rcl_ret_t ret = rcl_send_request(get_client_handle().get(), &request, &sequence_number);
877+
rcl_ret_t ret = rcl_send_request(get_client_handle().get(), request.get(), &sequence_number);
796878
if (RCL_RET_OK != ret) {
797879
rclcpp::exceptions::throw_from_rcl_error(ret, "failed to send request");
798880
}
@@ -830,6 +912,11 @@ class Client : public ClientBase
830912
CallbackInfoVariant>>
831913
pending_requests_;
832914
std::mutex pending_requests_mutex_;
915+
916+
private:
917+
using ClientIntraProcessT = rclcpp::experimental::ClientIntraProcess<ServiceT>;
918+
std::shared_ptr<ClientIntraProcessT> client_intra_process_;
919+
std::atomic_uint ipc_sequence_number_{1};
833920
};
834921

835922
} // namespace rclcpp

rclcpp/include/rclcpp/create_client.hpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ create_client(
3535
std::shared_ptr<node_interfaces::NodeServicesInterface> node_services,
3636
const std::string & service_name,
3737
const rmw_qos_profile_t & qos_profile,
38-
rclcpp::CallbackGroup::SharedPtr group)
38+
rclcpp::CallbackGroup::SharedPtr group,
39+
rclcpp::IntraProcessSetting ipc_setting)
3940
{
4041
rcl_client_options_t options = rcl_client_get_default_options();
4142
options.qos = qos_profile;
@@ -44,7 +45,8 @@ create_client(
4445
node_base.get(),
4546
node_graph,
4647
service_name,
47-
options);
48+
options,
49+
ipc_setting);
4850

4951
auto cli_base_ptr = std::dynamic_pointer_cast<rclcpp::ClientBase>(cli);
5052
node_services->add_client(cli_base_ptr, group);

rclcpp/include/rclcpp/create_service.hpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ create_service(
3737
const std::string & service_name,
3838
CallbackT && callback,
3939
const rmw_qos_profile_t & qos_profile,
40-
rclcpp::CallbackGroup::SharedPtr group)
40+
rclcpp::CallbackGroup::SharedPtr group,
41+
rclcpp::IntraProcessSetting ipc_setting)
4142
{
4243
rclcpp::AnyServiceCallback<ServiceT> any_service_callback;
4344
any_service_callback.set(std::forward<CallbackT>(callback));
@@ -46,8 +47,8 @@ create_service(
4647
service_options.qos = qos_profile;
4748

4849
auto serv = Service<ServiceT>::make_shared(
49-
node_base->get_shared_rcl_node_handle(),
50-
service_name, any_service_callback, service_options);
50+
node_base,
51+
service_name, any_service_callback, service_options, ipc_setting);
5152
auto serv_base_ptr = std::dynamic_pointer_cast<ServiceBase>(serv);
5253
node_services->add_service(serv_base_ptr, group);
5354
return serv;

rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,54 @@ class TypedIntraProcessBuffer : public IntraProcessBuffer<MessageT, Alloc, Messa
234234
}
235235
};
236236

237+
template<typename BufferT>
238+
class ServiceIntraProcessBuffer : public IntraProcessBufferBase
239+
{
240+
public:
241+
RCLCPP_SMART_PTR_ALIASES_ONLY(ServiceIntraProcessBuffer)
242+
243+
virtual ~ServiceIntraProcessBuffer() {}
244+
245+
explicit
246+
ServiceIntraProcessBuffer(
247+
std::unique_ptr<BufferImplementationBase<BufferT>> buffer_impl)
248+
{
249+
buffer_ = std::move(buffer_impl);
250+
}
251+
252+
// Not to be used in this class. Todo: review base class to avoid this.
253+
bool use_take_shared_method() const override
254+
{
255+
throw std::runtime_error(
256+
"use_take_shared_method not intended to be used in this class");
257+
return false;
258+
}
259+
260+
bool has_data() const override
261+
{
262+
return buffer_->has_data();
263+
}
264+
265+
void clear() override
266+
{
267+
buffer_->clear();
268+
}
269+
270+
void add(BufferT && msg)
271+
{
272+
buffer_->enqueue(std::move(msg));
273+
}
274+
275+
BufferT
276+
consume()
277+
{
278+
return buffer_->dequeue();
279+
}
280+
281+
private:
282+
std::unique_ptr<BufferImplementationBase<BufferT>> buffer_;
283+
};
284+
237285
} // namespace buffers
238286
} // namespace experimental
239287
} // namespace rclcpp

0 commit comments

Comments
 (0)