Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
endif()

set(${PROJECT_NAME}_SRCS
src/rclcpp/action_client_intra_process_base.cpp
src/rclcpp/action_server_intra_process_base.cpp
src/rclcpp/any_executable.cpp
src/rclcpp/callback_group.cpp
src/rclcpp/client.cpp
src/rclcpp/client_intra_process_base.cpp
src/rclcpp/clock.cpp
src/rclcpp/context.cpp
src/rclcpp/contexts/default_context.cpp
Expand Down Expand Up @@ -108,6 +111,7 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/serialization.cpp
src/rclcpp/serialized_message.cpp
src/rclcpp/service.cpp
src/rclcpp/service_intra_process_base.cpp
src/rclcpp/signal_handler.cpp
src/rclcpp/subscription_base.cpp
src/rclcpp/subscription_intra_process_base.cpp
Expand Down
130 changes: 121 additions & 9 deletions rclcpp/include/rclcpp/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,13 @@

#include "rclcpp/clock.hpp"
#include "rclcpp/detail/cpp_callback_trampoline.hpp"
#include "rclcpp/detail/resolve_use_intra_process.hpp"
#include "rclcpp/exceptions.hpp"
#include "rclcpp/expand_topic_or_service_name.hpp"
#include "rclcpp/experimental/client_intra_process.hpp"
#include "rclcpp/experimental/intra_process_manager.hpp"
#include "rclcpp/function_traits.hpp"
#include "rclcpp/intra_process_setting.hpp"
#include "rclcpp/logging.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_graph_interface.hpp"
Expand All @@ -48,6 +52,8 @@
#include "rclcpp/utilities.hpp"
#include "rclcpp/visibility_control.hpp"

#include "rcutils/logging_macros.h"

#include "rmw/error_handling.h"
#include "rmw/impl/cpp/demangle.hpp"
#include "rmw/rmw.h"
Expand Down Expand Up @@ -133,7 +139,7 @@ class ClientBase
rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph);

RCLCPP_PUBLIC
virtual ~ClientBase() = default;
virtual ~ClientBase();

/// Take the next response for this client as a type erased pointer.
/**
Expand Down Expand Up @@ -254,6 +260,15 @@ class ClientBase
rclcpp::QoS
get_response_subscription_actual_qos() const;

/// Return the waitable for intra-process
/**
* \return the waitable sharedpointer for intra-process, or nullptr if intra-process is not setup.
* \throws std::runtime_error if the intra process manager is destroyed
*/
RCLCPP_PUBLIC
rclcpp::Waitable::SharedPtr
get_intra_process_waitable();

/// Set a callback to be called when each new response is received.
/**
* The callback receives a size_t which is the number of responses received
Expand Down Expand Up @@ -358,17 +373,39 @@ class ClientBase
void
set_on_new_response_callback(rcl_event_callback_t callback, const void * user_data);

using IntraProcessManagerWeakPtr =
std::weak_ptr<rclcpp::experimental::IntraProcessManager>;

/// Implementation detail.
RCLCPP_PUBLIC
void
setup_intra_process(
uint64_t intra_process_client_id,
IntraProcessManagerWeakPtr weak_ipm);

std::shared_ptr<rclcpp::experimental::ClientIntraProcessBase> client_intra_process_;
std::atomic_uint ipc_sequence_number_{1};

rclcpp::node_interfaces::NodeGraphInterface::WeakPtr node_graph_;
std::shared_ptr<rcl_node_t> node_handle_;
std::shared_ptr<rclcpp::Context> context_;
rclcpp::Logger node_logger_;

std::recursive_mutex callback_mutex_;
// It is important to declare on_new_response_callback_ before
// client_handle_, so on destruction the client is
// destroyed first. Otherwise, the rmw client callback
// would point briefly to a destroyed function.
std::function<void(size_t)> on_new_response_callback_{nullptr};
// Declare client_handle_ after callback
std::shared_ptr<rcl_client_t> client_handle_;

std::atomic<bool> in_use_by_wait_set_{false};

std::recursive_mutex callback_mutex_;
std::function<void(size_t)> on_new_response_callback_{nullptr};
std::recursive_mutex ipc_mutex_;
bool use_intra_process_{false};
IntraProcessManagerWeakPtr weak_ipm_;
uint64_t intra_process_client_id_;
};

template<typename ServiceT>
Expand Down Expand Up @@ -464,12 +501,14 @@ class Client : public ClientBase
* \param[in] node_graph The node graph interface of the corresponding node.
* \param[in] service_name Name of the topic to publish to.
* \param[in] client_options options for the subscription.
* \param[in] ipc_setting Intra-process communication setting for the client.
*/
Client(
rclcpp::node_interfaces::NodeBaseInterface * node_base,
rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph,
const std::string & service_name,
rcl_client_options_t & client_options)
rcl_client_options_t & client_options,
rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault)
: ClientBase(node_base, node_graph),
srv_type_support_handle_(rosidl_typesupport_cpp::get_service_type_support_handle<ServiceT>())
{
Expand All @@ -492,10 +531,27 @@ class Client : public ClientBase
}
rclcpp::exceptions::throw_from_rcl_error(ret, "could not create client");
}

// Setup intra process if requested.
if (rclcpp::detail::resolve_use_intra_process(ipc_setting, *node_base)) {
create_intra_process_client();
}
}

virtual ~Client()
{
if (!use_intra_process_) {
return;
}
auto ipm = weak_ipm_.lock();
if (!ipm) {
// TODO(ivanpauno): should this raise an error?
RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"Intra process manager died before than a client.");
return;
}
ipm->remove_client(intra_process_client_id_);
}

/// Take the next response for this client.
Expand Down Expand Up @@ -612,7 +668,7 @@ class Client : public ClientBase
Promise promise;
auto future = promise.get_future();
auto req_id = async_send_request_impl(
*request,
std::move(request),
std::move(promise));
return FutureAndRequestId(std::move(future), req_id);
}
Expand Down Expand Up @@ -647,7 +703,7 @@ class Client : public ClientBase
Promise promise;
auto shared_future = promise.get_future().share();
auto req_id = async_send_request_impl(
*request,
std::move(request),
std::make_tuple(
CallbackType{std::forward<CallbackT>(cb)},
shared_future,
Expand Down Expand Up @@ -678,7 +734,7 @@ class Client : public ClientBase
PromiseWithRequest promise;
auto shared_future = promise.get_future().share();
auto req_id = async_send_request_impl(
*request,
request,
std::make_tuple(
CallbackWithRequestType{std::forward<CallbackT>(cb)},
request,
Expand Down Expand Up @@ -820,11 +876,33 @@ class Client : public ClientBase
CallbackWithRequestTypeValueVariant>;

int64_t
async_send_request_impl(const Request & request, CallbackInfoVariant value)
async_send_request_impl(SharedRequest request, CallbackInfoVariant value)
{
std::lock_guard<std::recursive_mutex> ipc_lock(ipc_mutex_);
if (use_intra_process_) {
auto ipm = weak_ipm_.lock();
if (!ipm) {
throw std::runtime_error(
"intra process send called after destruction of intra process manager");
}
bool intra_process_server_available = ipm->service_is_available(intra_process_client_id_);

// Check if there's an intra-process server available matching this client.
// If there's not, we fall back into inter-process communication, since
// the server might be available in another process or was configured to not use IPC.
if (intra_process_server_available) {
// Send intra-process request
ipm->send_intra_process_client_request<ServiceT>(
intra_process_client_id_,
std::make_pair(std::move(request), std::move(value)));
return ipc_sequence_number_++;
}
}

// Send inter-process request
int64_t sequence_number;
std::lock_guard<std::mutex> lock(pending_requests_mutex_);
rcl_ret_t ret = rcl_send_request(get_client_handle().get(), &request, &sequence_number);
rcl_ret_t ret = rcl_send_request(get_client_handle().get(), request.get(), &sequence_number);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "failed to send request");
}
Expand All @@ -850,6 +928,40 @@ class Client : public ClientBase
return value;
}

void
create_intra_process_client()
{
// Check if the QoS is compatible with intra-process.
auto qos_profile = get_response_subscription_actual_qos();

if (qos_profile.history() != rclcpp::HistoryPolicy::KeepLast) {
throw std::invalid_argument(
"intraprocess communication allowed only with keep last history qos policy");
}
if (qos_profile.depth() == 0) {
throw std::invalid_argument(
"intraprocess communication is not allowed with 0 depth qos policy");
}
if (qos_profile.durability() != rclcpp::DurabilityPolicy::Volatile) {
throw std::invalid_argument(
"intraprocess communication allowed only with volatile durability");
}

// Create a ClientIntraProcess which will be given to the intra-process manager.
using ClientIntraProcessT = rclcpp::experimental::ClientIntraProcess<ServiceT>;

client_intra_process_ = std::make_shared<ClientIntraProcessT>(
context_,
this->get_service_name(),
qos_profile);

// Add it to the intra process manager.
using rclcpp::experimental::IntraProcessManager;
auto ipm = context_->get_sub_context<IntraProcessManager>();
uint64_t intra_process_client_id = ipm->add_intra_process_client(client_intra_process_);
this->setup_intra_process(intra_process_client_id, ipm);
}

RCLCPP_DISABLE_COPY(Client)

std::unordered_map<
Expand Down
6 changes: 4 additions & 2 deletions rclcpp/include/rclcpp/create_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ create_client(
std::shared_ptr<node_interfaces::NodeServicesInterface> node_services,
const std::string & service_name,
const rmw_qos_profile_t & qos_profile,
rclcpp::CallbackGroup::SharedPtr group)
rclcpp::CallbackGroup::SharedPtr group,
rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault)
{
rcl_client_options_t options = rcl_client_get_default_options();
options.qos = qos_profile;
Expand All @@ -74,7 +75,8 @@ create_client(
node_base.get(),
node_graph,
service_name,
options);
options,
ipc_setting);

auto cli_base_ptr = std::dynamic_pointer_cast<rclcpp::ClientBase>(cli);
node_services->add_client(cli_base_ptr, group);
Expand Down
7 changes: 4 additions & 3 deletions rclcpp/include/rclcpp/create_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ create_service(
const std::string & service_name,
CallbackT && callback,
const rmw_qos_profile_t & qos_profile,
rclcpp::CallbackGroup::SharedPtr group)
rclcpp::CallbackGroup::SharedPtr group,
rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault)
{
rclcpp::AnyServiceCallback<ServiceT> any_service_callback;
any_service_callback.set(std::forward<CallbackT>(callback));
Expand All @@ -72,8 +73,8 @@ create_service(
service_options.qos = qos_profile;

auto serv = Service<ServiceT>::make_shared(
node_base->get_shared_rcl_node_handle(),
service_name, any_service_callback, service_options);
node_base,
service_name, any_service_callback, service_options, ipc_setting);
auto serv_base_ptr = std::dynamic_pointer_cast<ServiceBase>(serv);
node_services->add_service(serv_base_ptr, group);
return serv;
Expand Down
6 changes: 3 additions & 3 deletions rclcpp/include/rclcpp/detail/resolve_use_intra_process.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ namespace detail
{

/// Return whether or not intra process is enabled, resolving "NodeDefault" if needed.
template<typename OptionsT, typename NodeBaseT>
template<typename NodeBaseT>
bool
resolve_use_intra_process(const OptionsT & options, const NodeBaseT & node_base)
resolve_use_intra_process(IntraProcessSetting ipc_setting, const NodeBaseT & node_base)
{
bool use_intra_process;
switch (options.use_intra_process_comm) {
switch (ipc_setting) {
case IntraProcessSetting::Enable:
use_intra_process = true;
break;
Expand Down
10 changes: 10 additions & 0 deletions rclcpp/include/rclcpp/event_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,16 @@ class EventHandler : public EventHandlerBase
}
}

~EventHandler()
{
// Since the rmw event listener holds a reference to the
// "on ready" callback, we need to clear it on destruction of this class.
// This clearing is not needed for other rclcpp entities like pub/subs, since
// they do own the underlying rmw entities, which are destroyed
// on their rclcpp destructors, thus no risk of dangling pointers.
clear_on_ready_callback();
}

/// Take data so that the callback cannot be scheduled again
std::shared_ptr<void>
take_data() override
Expand Down
Loading