1515#ifndef RCLCPP__EXPERIMENTAL__SERVICE_INTRA_PROCESS_BASE_HPP_
1616#define RCLCPP__EXPERIMENTAL__SERVICE_INTRA_PROCESS_BASE_HPP_
1717
18+ #include < algorithm>
1819#include < memory>
1920#include < mutex>
2021#include < string>
2122#include < unordered_map>
2223#include < utility>
2324
2425#include " rcl/error_handling.h"
26+ #include " rmw/impl/cpp/demangle.hpp"
2527
2628#include " rclcpp/context.hpp"
2729#include " rclcpp/experimental/client_intra_process_base.hpp"
2830#include " rclcpp/guard_condition.hpp"
31+ #include " rclcpp/logging.hpp"
2932#include " rclcpp/qos.hpp"
3033#include " rclcpp/type_support_decl.hpp"
3134#include " rclcpp/waitable.hpp"
@@ -40,6 +43,11 @@ class ServiceIntraProcessBase : public rclcpp::Waitable
4043public:
4144 RCLCPP_SMART_PTR_ALIASES_ONLY (ServiceIntraProcessBase)
4245
46+ enum class EntityType : std::size_t
47+ {
48+ Service,
49+ };
50+
4351 RCLCPP_PUBLIC
4452 ServiceIntraProcessBase (
4553 rclcpp::Context::SharedPtr context,
@@ -65,6 +73,13 @@ class ServiceIntraProcessBase : public rclcpp::Waitable
6573 std::shared_ptr<void >
6674 take_data () = 0 ;
6775
76+ std::shared_ptr<void >
77+ take_data_by_entity_id (size_t id) override
78+ {
79+ (void )id;
80+ return take_data ();
81+ }
82+
6883 virtual void
6984 execute (std::shared_ptr<void > & data) = 0 ;
7085
@@ -82,9 +97,103 @@ class ServiceIntraProcessBase : public rclcpp::Waitable
8297 rclcpp::experimental::ClientIntraProcessBase::SharedPtr client,
8398 uint64_t client_id);
8499
100+ // / Set a callback to be called when each new request arrives.
101+ /* *
102+ * The callback receives a size_t which is the number of requests received
103+ * since the last time this callback was called.
104+ * Normally this is 1, but can be > 1 if requests were received before any
105+ * callback was set.
106+ *
107+ * The callback also receives an int identifier argument.
108+ * This is needed because a Waitable may be composed of several distinct entities,
109+ * such as subscriptions, services, etc.
110+ * The application should provide a generic callback function that will be then
111+ * forwarded by the waitable to all of its entities.
112+ * Before forwarding, a different value for the identifier argument will be
113+ * bound to the function.
114+ * This implies that the provided callback can use the identifier to behave
115+ * differently depending on which entity triggered the waitable to become ready.
116+ *
117+ * Calling it again will clear any previously set callback.
118+ *
119+ * An exception will be thrown if the callback is not callable.
120+ *
121+ * This function is thread-safe.
122+ *
123+ * If you want more information available in the callback, like the subscription
124+ * or other information, you may use a lambda with captures or std::bind.
125+ *
126+ * \param[in] callback functor to be called when a new request is received.
127+ */
128+ void
129+ set_on_ready_callback (std::function<void (size_t , int )> callback) override
130+ {
131+ if (!callback) {
132+ throw std::invalid_argument (
133+ " The callback passed to set_on_ready_callback "
134+ " is not callable." );
135+ }
136+
137+ // Note: we bind the int identifier argument to this waitable's entity types
138+ auto new_callback =
139+ [callback, this ](size_t number_of_events) {
140+ try {
141+ callback (number_of_events, static_cast <int >(EntityType::Service));
142+ } catch (const std::exception & exception) {
143+ RCLCPP_ERROR_STREAM (
144+ // TODO(wjwwood): get this class access to the node logger it is associated with
145+ rclcpp::get_logger (" rclcpp" ),
146+ " rclcpp::ServiceIntraProcessBase@" << this <<
147+ " caught " << rmw::impl::cpp::demangle (exception) <<
148+ " exception in user-provided callback for the 'on ready' callback: " <<
149+ exception.what ());
150+ } catch (...) {
151+ RCLCPP_ERROR_STREAM (
152+ rclcpp::get_logger (" rclcpp" ),
153+ " rclcpp::ServiceIntraProcessBase@" << this <<
154+ " caught unhandled exception in user-provided callback " <<
155+ " for the 'on ready' callback" );
156+ }
157+ };
158+
159+ std::lock_guard<std::recursive_mutex> lock (reentrant_mutex_);
160+ on_new_request_callback_ = new_callback;
161+
162+ if (unread_count_ > 0 ) {
163+ if (qos_profile_.history () == HistoryPolicy::KeepAll) {
164+ on_new_request_callback_ (unread_count_);
165+ } else {
166+ // Use qos profile depth as upper bound for unread_count_
167+ on_new_request_callback_ (std::min (unread_count_, qos_profile_.depth ()));
168+ }
169+ unread_count_ = 0 ;
170+ }
171+ }
172+
173+ // / Unset the callback registered for new messages, if any.
174+ void
175+ clear_on_ready_callback () override
176+ {
177+ std::lock_guard<std::recursive_mutex> lock (reentrant_mutex_);
178+ on_new_request_callback_ = nullptr ;
179+ }
180+
85181protected:
86182 std::recursive_mutex reentrant_mutex_;
87183 rclcpp::GuardCondition gc_;
184+ std::function<void (size_t )> on_new_request_callback_ {nullptr };
185+ size_t unread_count_{0 };
186+
187+ void
188+ invoke_on_new_request ()
189+ {
190+ std::lock_guard<std::recursive_mutex> lock (reentrant_mutex_);
191+ if (on_new_request_callback_) {
192+ on_new_request_callback_ (1 );
193+ } else {
194+ unread_count_++;
195+ }
196+ }
88197
89198 using ClientMap =
90199 std::unordered_map<uint64_t , rclcpp::experimental::ClientIntraProcessBase::WeakPtr>;
0 commit comments