Skip to content

Commit 2f48887

Browse files
Mauro PasserinoAlberto Soragna
authored andcommitted
Support intra-process communication: Clients & Services (ros2#1847)
Signed-off-by: Mauro Passerino <[email protected]>
1 parent 02d0ae8 commit 2f48887

32 files changed

+1677
-67
lines changed

rclcpp/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ set(${PROJECT_NAME}_SRCS
3939
src/rclcpp/any_executable.cpp
4040
src/rclcpp/callback_group.cpp
4141
src/rclcpp/client.cpp
42+
src/rclcpp/client_intra_process_base.cpp
4243
src/rclcpp/clock.cpp
4344
src/rclcpp/context.cpp
4445
src/rclcpp/contexts/default_context.cpp
@@ -108,6 +109,7 @@ set(${PROJECT_NAME}_SRCS
108109
src/rclcpp/serialization.cpp
109110
src/rclcpp/serialized_message.cpp
110111
src/rclcpp/service.cpp
112+
src/rclcpp/service_intra_process_base.cpp
111113
src/rclcpp/signal_handler.cpp
112114
src/rclcpp/subscription_base.cpp
113115
src/rclcpp/subscription_intra_process_base.cpp

rclcpp/include/rclcpp/client.hpp

Lines changed: 118 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,13 @@
3737

3838
#include "rclcpp/clock.hpp"
3939
#include "rclcpp/detail/cpp_callback_trampoline.hpp"
40+
#include "rclcpp/detail/resolve_use_intra_process.hpp"
4041
#include "rclcpp/exceptions.hpp"
4142
#include "rclcpp/expand_topic_or_service_name.hpp"
43+
#include "rclcpp/experimental/client_intra_process.hpp"
44+
#include "rclcpp/experimental/intra_process_manager.hpp"
4245
#include "rclcpp/function_traits.hpp"
46+
#include "rclcpp/intra_process_setting.hpp"
4347
#include "rclcpp/logging.hpp"
4448
#include "rclcpp/macros.hpp"
4549
#include "rclcpp/node_interfaces/node_graph_interface.hpp"
@@ -48,6 +52,8 @@
4852
#include "rclcpp/utilities.hpp"
4953
#include "rclcpp/visibility_control.hpp"
5054

55+
#include "rcutils/logging_macros.h"
56+
5157
#include "rmw/error_handling.h"
5258
#include "rmw/impl/cpp/demangle.hpp"
5359
#include "rmw/rmw.h"
@@ -133,7 +139,7 @@ class ClientBase
133139
rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph);
134140

135141
RCLCPP_PUBLIC
136-
virtual ~ClientBase() = default;
142+
virtual ~ClientBase();
137143

138144
/// Take the next response for this client as a type erased pointer.
139145
/**
@@ -254,6 +260,15 @@ class ClientBase
254260
rclcpp::QoS
255261
get_response_subscription_actual_qos() const;
256262

263+
/// Return the waitable for intra-process
264+
/**
265+
* \return the waitable sharedpointer for intra-process, or nullptr if intra-process is not setup.
266+
* \throws std::runtime_error if the intra process manager is destroyed
267+
*/
268+
RCLCPP_PUBLIC
269+
rclcpp::Waitable::SharedPtr
270+
get_intra_process_waitable();
271+
257272
/// Set a callback to be called when each new response is received.
258273
/**
259274
* The callback receives a size_t which is the number of responses received
@@ -358,6 +373,19 @@ class ClientBase
358373
void
359374
set_on_new_response_callback(rcl_event_callback_t callback, const void * user_data);
360375

376+
using IntraProcessManagerWeakPtr =
377+
std::weak_ptr<rclcpp::experimental::IntraProcessManager>;
378+
379+
/// Implementation detail.
380+
RCLCPP_PUBLIC
381+
void
382+
setup_intra_process(
383+
uint64_t intra_process_client_id,
384+
IntraProcessManagerWeakPtr weak_ipm);
385+
386+
std::shared_ptr<rclcpp::experimental::ClientIntraProcessBase> client_intra_process_;
387+
std::atomic_uint ipc_sequence_number_{1};
388+
361389
rclcpp::node_interfaces::NodeGraphInterface::WeakPtr node_graph_;
362390
std::shared_ptr<rcl_node_t> node_handle_;
363391
std::shared_ptr<rclcpp::Context> context_;
@@ -373,6 +401,14 @@ class ClientBase
373401
std::shared_ptr<rcl_client_t> client_handle_;
374402

375403
std::atomic<bool> in_use_by_wait_set_{false};
404+
405+
std::recursive_mutex ipc_mutex_;
406+
bool use_intra_process_{false};
407+
IntraProcessManagerWeakPtr weak_ipm_;
408+
uint64_t intra_process_client_id_;
409+
410+
std::recursive_mutex callback_mutex_;
411+
std::function<void(size_t)> on_new_response_callback_{nullptr};
376412
};
377413

378414
template<typename ServiceT>
@@ -468,14 +504,16 @@ class Client : public ClientBase
468504
* \param[in] node_graph The node graph interface of the corresponding node.
469505
* \param[in] service_name Name of the topic to publish to.
470506
* \param[in] client_options options for the subscription.
507+
* \param[in] ipc_setting Intra-process communication setting for the client.
471508
*/
472509
Client(
473510
rclcpp::node_interfaces::NodeBaseInterface * node_base,
474511
rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph,
475512
const std::string & service_name,
476-
rcl_client_options_t & client_options)
513+
rcl_client_options_t & client_options,
514+
rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault)
477515
: ClientBase(node_base, node_graph),
478-
srv_type_support_handle_(rosidl_typesupport_cpp::get_service_type_support_handle<ServiceT>())
516+
srv_type_support_handle_(rosidl_typesupport_cpp::get_service_type_support_handle<ServiceT>())
479517
{
480518
rcl_ret_t ret = rcl_client_init(
481519
this->get_client_handle().get(),
@@ -496,10 +534,27 @@ class Client : public ClientBase
496534
}
497535
rclcpp::exceptions::throw_from_rcl_error(ret, "could not create client");
498536
}
537+
538+
// Setup intra process if requested.
539+
if (rclcpp::detail::resolve_use_intra_process(ipc_setting, *node_base)) {
540+
create_intra_process_client();
541+
}
499542
}
500543

501544
virtual ~Client()
502545
{
546+
if (!use_intra_process_) {
547+
return;
548+
}
549+
auto ipm = weak_ipm_.lock();
550+
if (!ipm) {
551+
// TODO(ivanpauno): should this raise an error?
552+
RCLCPP_WARN(
553+
rclcpp::get_logger("rclcpp"),
554+
"Intra process manager died before than a client.");
555+
return;
556+
}
557+
ipm->remove_client(intra_process_client_id_);
503558
}
504559

505560
/// Take the next response for this client.
@@ -616,7 +671,7 @@ class Client : public ClientBase
616671
Promise promise;
617672
auto future = promise.get_future();
618673
auto req_id = async_send_request_impl(
619-
*request,
674+
std::move(request),
620675
std::move(promise));
621676
return FutureAndRequestId(std::move(future), req_id);
622677
}
@@ -651,7 +706,7 @@ class Client : public ClientBase
651706
Promise promise;
652707
auto shared_future = promise.get_future().share();
653708
auto req_id = async_send_request_impl(
654-
*request,
709+
std::move(request),
655710
std::make_tuple(
656711
CallbackType{std::forward<CallbackT>(cb)},
657712
shared_future,
@@ -682,7 +737,7 @@ class Client : public ClientBase
682737
PromiseWithRequest promise;
683738
auto shared_future = promise.get_future().share();
684739
auto req_id = async_send_request_impl(
685-
*request,
740+
request,
686741
std::make_tuple(
687742
CallbackWithRequestType{std::forward<CallbackT>(cb)},
688743
request,
@@ -824,8 +879,30 @@ class Client : public ClientBase
824879
CallbackWithRequestTypeValueVariant>;
825880

826881
int64_t
827-
async_send_request_impl(const Request & request, CallbackInfoVariant value)
882+
async_send_request_impl(SharedRequest request, CallbackInfoVariant value)
828883
{
884+
std::lock_guard<std::recursive_mutex> lock(ipc_mutex_);
885+
if (use_intra_process_) {
886+
auto ipm = weak_ipm_.lock();
887+
if (!ipm) {
888+
throw std::runtime_error(
889+
"intra process send called after destruction of intra process manager");
890+
}
891+
bool intra_process_server_available = ipm->service_is_available(intra_process_client_id_);
892+
893+
// Check if there's an intra-process server available matching this client.
894+
// If there's not, we fall back into inter-process communication, since
895+
// the server might be available in another process or was configured to not use IPC.
896+
if (intra_process_server_available) {
897+
// Send intra-process request
898+
ipm->send_intra_process_client_request<ServiceT>(
899+
intra_process_client_id_,
900+
std::make_pair(std::move(request), std::move(value)));
901+
return ipc_sequence_number_++;
902+
}
903+
}
904+
905+
// Send inter-process request
829906
int64_t sequence_number;
830907
std::lock_guard<std::mutex> lock(pending_requests_mutex_);
831908
rcl_ret_t ret = rcl_send_request(get_client_handle().get(), &request, &sequence_number);
@@ -854,6 +931,40 @@ class Client : public ClientBase
854931
return value;
855932
}
856933

934+
void
935+
create_intra_process_client()
936+
{
937+
// Check if the QoS is compatible with intra-process.
938+
auto qos_profile = get_response_subscription_actual_qos();
939+
940+
if (qos_profile.history() != rclcpp::HistoryPolicy::KeepLast) {
941+
throw std::invalid_argument(
942+
"intraprocess communication allowed only with keep last history qos policy");
943+
}
944+
if (qos_profile.depth() == 0) {
945+
throw std::invalid_argument(
946+
"intraprocess communication is not allowed with 0 depth qos policy");
947+
}
948+
if (qos_profile.durability() != rclcpp::DurabilityPolicy::Volatile) {
949+
throw std::invalid_argument(
950+
"intraprocess communication allowed only with volatile durability");
951+
}
952+
953+
// Create a ClientIntraProcess which will be given to the intra-process manager.
954+
using ClientIntraProcessT = rclcpp::experimental::ClientIntraProcess<ServiceT>;
955+
956+
client_intra_process_ = std::make_shared<ClientIntraProcessT>(
957+
context_,
958+
this->get_service_name(),
959+
qos_profile);
960+
961+
// Add it to the intra process manager.
962+
using rclcpp::experimental::IntraProcessManager;
963+
auto ipm = context_->get_sub_context<IntraProcessManager>();
964+
uint64_t intra_process_client_id = ipm->add_intra_process_client(client_intra_process_);
965+
this->setup_intra_process(intra_process_client_id, ipm);
966+
}
967+
857968
RCLCPP_DISABLE_COPY(Client)
858969

859970
std::unordered_map<

rclcpp/include/rclcpp/create_client.hpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ create_client(
6565
std::shared_ptr<node_interfaces::NodeServicesInterface> node_services,
6666
const std::string & service_name,
6767
const rmw_qos_profile_t & qos_profile,
68-
rclcpp::CallbackGroup::SharedPtr group)
68+
rclcpp::CallbackGroup::SharedPtr group,
69+
rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault)
6970
{
7071
rcl_client_options_t options = rcl_client_get_default_options();
7172
options.qos = qos_profile;
@@ -74,7 +75,8 @@ create_client(
7475
node_base.get(),
7576
node_graph,
7677
service_name,
77-
options);
78+
options,
79+
ipc_setting);
7880

7981
auto cli_base_ptr = std::dynamic_pointer_cast<rclcpp::ClientBase>(cli);
8082
node_services->add_client(cli_base_ptr, group);

rclcpp/include/rclcpp/create_service.hpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ create_service(
6363
const std::string & service_name,
6464
CallbackT && callback,
6565
const rmw_qos_profile_t & qos_profile,
66-
rclcpp::CallbackGroup::SharedPtr group)
66+
rclcpp::CallbackGroup::SharedPtr group,
67+
rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault)
6768
{
6869
rclcpp::AnyServiceCallback<ServiceT> any_service_callback;
6970
any_service_callback.set(std::forward<CallbackT>(callback));
@@ -72,8 +73,8 @@ create_service(
7273
service_options.qos = qos_profile;
7374

7475
auto serv = Service<ServiceT>::make_shared(
75-
node_base->get_shared_rcl_node_handle(),
76-
service_name, any_service_callback, service_options);
76+
node_base,
77+
service_name, any_service_callback, service_options, ipc_setting);
7778
auto serv_base_ptr = std::dynamic_pointer_cast<ServiceBase>(serv);
7879
node_services->add_service(serv_base_ptr, group);
7980
return serv;

rclcpp/include/rclcpp/detail/resolve_use_intra_process.hpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@ namespace detail
2626
{
2727

2828
/// Return whether or not intra process is enabled, resolving "NodeDefault" if needed.
29-
template<typename OptionsT, typename NodeBaseT>
29+
template<typename NodeBaseT>
3030
bool
31-
resolve_use_intra_process(const OptionsT & options, const NodeBaseT & node_base)
31+
resolve_use_intra_process(IntraProcessSetting ipc_setting, const NodeBaseT & node_base)
3232
{
3333
bool use_intra_process;
34-
switch (options.use_intra_process_comm) {
34+
switch (ipc_setting) {
3535
case IntraProcessSetting::Enable:
3636
use_intra_process = true;
3737
break;

rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,51 @@ class TypedIntraProcessBuffer : public IntraProcessBuffer<MessageT, Alloc, Messa
239239
}
240240
};
241241

242+
template<typename BufferT>
243+
class ServiceIntraProcessBuffer : public IntraProcessBufferBase
244+
{
245+
public:
246+
RCLCPP_SMART_PTR_ALIASES_ONLY(ServiceIntraProcessBuffer)
247+
248+
virtual ~ServiceIntraProcessBuffer() {}
249+
250+
explicit
251+
ServiceIntraProcessBuffer(
252+
std::unique_ptr<BufferImplementationBase<BufferT>> buffer_impl)
253+
{
254+
buffer_ = std::move(buffer_impl);
255+
}
256+
257+
bool use_take_shared_method() const override
258+
{
259+
return false;
260+
}
261+
262+
bool has_data() const override
263+
{
264+
return buffer_->has_data();
265+
}
266+
267+
void clear() override
268+
{
269+
buffer_->clear();
270+
}
271+
272+
void add(BufferT && msg)
273+
{
274+
buffer_->enqueue(std::move(msg));
275+
}
276+
277+
BufferT
278+
consume()
279+
{
280+
return buffer_->dequeue();
281+
}
282+
283+
private:
284+
std::unique_ptr<BufferImplementationBase<BufferT>> buffer_;
285+
};
286+
242287
} // namespace buffers
243288
} // namespace experimental
244289
} // namespace rclcpp

0 commit comments

Comments
 (0)