Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/any_executable.cpp
src/rclcpp/callback_group.cpp
src/rclcpp/client.cpp
src/rclcpp/client_intra_process_base.cpp
src/rclcpp/clock.cpp
src/rclcpp/context.cpp
src/rclcpp/contexts/default_context.cpp
Expand Down Expand Up @@ -97,6 +98,7 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/serialization.cpp
src/rclcpp/serialized_message.cpp
src/rclcpp/service.cpp
src/rclcpp/service_intra_process_base.cpp
src/rclcpp/signal_handler.cpp
src/rclcpp/subscription_base.cpp
src/rclcpp/subscription_intra_process_base.cpp
Expand Down
122 changes: 115 additions & 7 deletions rclcpp/include/rclcpp/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,13 @@
#include "rcl/wait.h"

#include "rclcpp/detail/cpp_callback_trampoline.hpp"
#include "rclcpp/detail/resolve_use_intra_process.hpp"
#include "rclcpp/exceptions.hpp"
#include "rclcpp/expand_topic_or_service_name.hpp"
#include "rclcpp/experimental/client_intra_process.hpp"
#include "rclcpp/experimental/intra_process_manager.hpp"
#include "rclcpp/function_traits.hpp"
#include "rclcpp/intra_process_setting.hpp"
#include "rclcpp/logging.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_graph_interface.hpp"
Expand All @@ -45,6 +49,8 @@
#include "rclcpp/utilities.hpp"
#include "rclcpp/visibility_control.hpp"

#include "rcutils/logging_macros.h"

#include "rmw/error_handling.h"
#include "rmw/impl/cpp/demangle.hpp"
#include "rmw/rmw.h"
Expand Down Expand Up @@ -130,7 +136,7 @@ class ClientBase
rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph);

RCLCPP_PUBLIC
virtual ~ClientBase() = default;
virtual ~ClientBase();

/// Take the next response for this client as a type erased pointer.
/**
Expand Down Expand Up @@ -251,6 +257,15 @@ class ClientBase
rclcpp::QoS
get_response_subscription_actual_qos() const;

/// Return the waitable for intra-process
/**
* \return the waitable sharedpointer for intra-process, or nullptr if intra-process is not setup.
* \throws std::runtime_error if the intra process manager is destroyed
*/
RCLCPP_PUBLIC
rclcpp::Waitable::SharedPtr
get_intra_process_waitable();

/// Set a callback to be called when each new response is received.
/**
* The callback receives a size_t which is the number of responses received
Expand Down Expand Up @@ -354,6 +369,19 @@ class ClientBase
void
set_on_new_response_callback(rcl_event_callback_t callback, const void * user_data);

using IntraProcessManagerWeakPtr =
std::weak_ptr<rclcpp::experimental::IntraProcessManager>;

/// Implementation detail.
RCLCPP_PUBLIC
void
setup_intra_process(
uint64_t intra_process_client_id,
IntraProcessManagerWeakPtr weak_ipm);

std::shared_ptr<rclcpp::experimental::ClientIntraProcessBase> client_intra_process_;
std::atomic_uint ipc_sequence_number_{1};

rclcpp::node_interfaces::NodeGraphInterface::WeakPtr node_graph_;
std::shared_ptr<rcl_node_t> node_handle_;
std::shared_ptr<rclcpp::Context> context_;
Expand All @@ -363,6 +391,11 @@ class ClientBase

std::atomic<bool> in_use_by_wait_set_{false};

std::recursive_mutex ipc_mutex_;
bool use_intra_process_{false};
IntraProcessManagerWeakPtr weak_ipm_;
uint64_t intra_process_client_id_;

std::recursive_mutex callback_mutex_;
std::function<void(size_t)> on_new_response_callback_{nullptr};
};
Expand Down Expand Up @@ -460,12 +493,14 @@ class Client : public ClientBase
* \param[in] node_graph The node graph interface of the corresponding node.
* \param[in] service_name Name of the topic to publish to.
* \param[in] client_options options for the subscription.
* \param[in] ipc_setting Intra-process communication setting for the client.
*/
Client(
rclcpp::node_interfaces::NodeBaseInterface * node_base,
rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph,
const std::string & service_name,
rcl_client_options_t & client_options)
rcl_client_options_t & client_options,
rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault)
: ClientBase(node_base, node_graph)
{
using rosidl_typesupport_cpp::get_service_type_support_handle;
Expand All @@ -490,10 +525,27 @@ class Client : public ClientBase
}
rclcpp::exceptions::throw_from_rcl_error(ret, "could not create client");
}

// Setup intra process if requested.
if (rclcpp::detail::resolve_use_intra_process(ipc_setting, *node_base)) {
create_intra_process_client();
}
}

virtual ~Client()
{
if (!use_intra_process_) {
return;
}
auto ipm = weak_ipm_.lock();
if (!ipm) {
// TODO(ivanpauno): should this raise an error?
RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"Intra process manager died before than a client.");
return;
}
ipm->remove_client(intra_process_client_id_);
}

/// Take the next response for this client.
Expand Down Expand Up @@ -610,7 +662,7 @@ class Client : public ClientBase
Promise promise;
auto future = promise.get_future();
auto req_id = async_send_request_impl(
*request,
std::move(request),
std::move(promise));
return FutureAndRequestId(std::move(future), req_id);
}
Expand Down Expand Up @@ -645,7 +697,7 @@ class Client : public ClientBase
Promise promise;
auto shared_future = promise.get_future().share();
auto req_id = async_send_request_impl(
*request,
std::move(request),
std::make_tuple(
CallbackType{std::forward<CallbackT>(cb)},
shared_future,
Expand Down Expand Up @@ -676,7 +728,7 @@ class Client : public ClientBase
PromiseWithRequest promise;
auto shared_future = promise.get_future().share();
auto req_id = async_send_request_impl(
*request,
request,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not std::move as in the other calls above?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not moving here because needs to be valid for later use in line 718

std::make_tuple(
CallbackWithRequestType{std::forward<CallbackT>(cb)},
request,
Expand Down Expand Up @@ -789,10 +841,32 @@ class Client : public ClientBase
CallbackWithRequestTypeValueVariant>;

int64_t
async_send_request_impl(const Request & request, CallbackInfoVariant value)
async_send_request_impl(SharedRequest request, CallbackInfoVariant value)
{
std::lock_guard<std::recursive_mutex> lock(ipc_mutex_);
if (use_intra_process_) {
auto ipm = weak_ipm_.lock();
if (!ipm) {
throw std::runtime_error(
"intra process send called after destruction of intra process manager");
}
bool intra_process_server_available = ipm->service_is_available(intra_process_client_id_);

// Check if there's an intra-process server available matching this client.
// If there's not, we fall back into inter-process communication, since
// the server might be available in another process or was configured to not use IPC.
if (intra_process_server_available) {
// Send intra-process request
ipm->send_intra_process_client_request<ServiceT>(
intra_process_client_id_,
std::make_pair(std::move(request), std::move(value)));
return ipc_sequence_number_++;
}
}

// Send inter-process request
int64_t sequence_number;
rcl_ret_t ret = rcl_send_request(get_client_handle().get(), &request, &sequence_number);
rcl_ret_t ret = rcl_send_request(get_client_handle().get(), request.get(), &sequence_number);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "failed to send request");
}
Expand Down Expand Up @@ -821,6 +895,40 @@ class Client : public ClientBase
return value;
}

void
create_intra_process_client()
{
// Check if the QoS is compatible with intra-process.
auto qos_profile = get_response_subscription_actual_qos();

if (qos_profile.history() != rclcpp::HistoryPolicy::KeepLast) {
throw std::invalid_argument(
"intraprocess communication allowed only with keep last history qos policy");
}
if (qos_profile.depth() == 0) {
throw std::invalid_argument(
"intraprocess communication is not allowed with 0 depth qos policy");
}
if (qos_profile.durability() != rclcpp::DurabilityPolicy::Volatile) {
throw std::invalid_argument(
"intraprocess communication allowed only with volatile durability");
}

// Create a ClientIntraProcess which will be given to the intra-process manager.
using ClientIntraProcessT = rclcpp::experimental::ClientIntraProcess<ServiceT>;

client_intra_process_ = std::make_shared<ClientIntraProcessT>(
context_,
this->get_service_name(),
qos_profile);

// Add it to the intra process manager.
using rclcpp::experimental::IntraProcessManager;
auto ipm = context_->get_sub_context<IntraProcessManager>();
uint64_t intra_process_client_id = ipm->add_intra_process_client(client_intra_process_);
this->setup_intra_process(intra_process_client_id, ipm);
}

RCLCPP_DISABLE_COPY(Client)

std::unordered_map<
Expand Down
13 changes: 9 additions & 4 deletions rclcpp/include/rclcpp/create_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ namespace rclcpp
* \param[in] service_name The name on which the service is accessible.
* \param[in] qos Quality of service profile for client.
* \param[in] group Callback group to handle the reply to service calls.
* \param[in] ipc_setting The intra-process setting for the client.
* \return Shared pointer to the created client.
*/
template<typename ServiceT>
Expand All @@ -46,13 +47,15 @@ create_client(
std::shared_ptr<node_interfaces::NodeServicesInterface> node_services,
const std::string & service_name,
const rclcpp::QoS & qos = rclcpp::ServicesQoS(),
rclcpp::CallbackGroup::SharedPtr group = nullptr)
rclcpp::CallbackGroup::SharedPtr group = nullptr,
rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault)
{
return create_client<ServiceT>(
node_base, node_graph, node_services,
service_name,
qos.get_rmw_qos_profile(),
group);
group,
ipc_setting);
}

/// Create a service client with a given type.
Expand All @@ -65,7 +68,8 @@ create_client(
std::shared_ptr<node_interfaces::NodeServicesInterface> node_services,
const std::string & service_name,
const rmw_qos_profile_t & qos_profile,
rclcpp::CallbackGroup::SharedPtr group)
rclcpp::CallbackGroup::SharedPtr group,
rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault)
{
rcl_client_options_t options = rcl_client_get_default_options();
options.qos = qos_profile;
Expand All @@ -74,7 +78,8 @@ create_client(
node_base.get(),
node_graph,
service_name,
options);
options,
ipc_setting);

auto cli_base_ptr = std::dynamic_pointer_cast<rclcpp::ClientBase>(cli);
node_services->add_client(cli_base_ptr, group);
Expand Down
13 changes: 8 additions & 5 deletions rclcpp/include/rclcpp/create_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ namespace rclcpp
* \param[in] callback The callback to call when the service gets a request.
* \param[in] qos Quality of service profile for the service.
* \param[in] group Callback group to handle the reply to service calls.
* \param[in] ipc_setting The intra-process setting for the service.
* \return Shared pointer to the created service.
*/
template<typename ServiceT, typename CallbackT>
Expand All @@ -46,11 +47,12 @@ create_service(
const std::string & service_name,
CallbackT && callback,
const rclcpp::QoS & qos,
rclcpp::CallbackGroup::SharedPtr group)
rclcpp::CallbackGroup::SharedPtr group,
rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault)
{
return create_service<ServiceT, CallbackT>(
node_base, node_services, service_name,
std::forward<CallbackT>(callback), qos.get_rmw_qos_profile(), group);
std::forward<CallbackT>(callback), qos.get_rmw_qos_profile(), group, ipc_setting);
}

/// Create a service with a given type.
Expand All @@ -63,7 +65,8 @@ create_service(
const std::string & service_name,
CallbackT && callback,
const rmw_qos_profile_t & qos_profile,
rclcpp::CallbackGroup::SharedPtr group)
rclcpp::CallbackGroup::SharedPtr group,
rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault)
{
rclcpp::AnyServiceCallback<ServiceT> any_service_callback;
any_service_callback.set(std::forward<CallbackT>(callback));
Expand All @@ -72,8 +75,8 @@ create_service(
service_options.qos = qos_profile;

auto serv = Service<ServiceT>::make_shared(
node_base->get_shared_rcl_node_handle(),
service_name, any_service_callback, service_options);
node_base,
service_name, any_service_callback, service_options, ipc_setting);
auto serv_base_ptr = std::dynamic_pointer_cast<ServiceBase>(serv);
node_services->add_service(serv_base_ptr, group);
return serv;
Expand Down
6 changes: 3 additions & 3 deletions rclcpp/include/rclcpp/detail/resolve_use_intra_process.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ namespace detail
{

/// Return whether or not intra process is enabled, resolving "NodeDefault" if needed.
template<typename OptionsT, typename NodeBaseT>
template<typename NodeBaseT>
bool
resolve_use_intra_process(const OptionsT & options, const NodeBaseT & node_base)
resolve_use_intra_process(IntraProcessSetting ipc_setting, const NodeBaseT & node_base)
{
bool use_intra_process;
switch (options.use_intra_process_comm) {
switch (ipc_setting) {
case IntraProcessSetting::Enable:
use_intra_process = true;
break;
Expand Down
Loading