Skip to content

Commit 3df73f0

Browse files
Implement generic client (#2358)
* Implement generic client Signed-off-by: Barry Xu <[email protected]> * Fix the incorrect parameter declaration Signed-off-by: Barry Xu <[email protected]> * Deleted copy constructor and assignment for FutureAndRequestId Signed-off-by: Barry Xu <[email protected]> * Update codes after rebase Signed-off-by: Barry Xu <[email protected]> * Address review comments Signed-off-by: Barry Xu <[email protected]> * Address review comments from iuhilnehc-ynos Signed-off-by: Barry Xu <[email protected]> * Correct an error in a description Signed-off-by: Barry Xu <[email protected]> * Fix window build errors Signed-off-by: Barry Xu <[email protected]> * Address review comments from William Signed-off-by: Barry Xu <[email protected]> * Add doc strings to create_generic_client Signed-off-by: Barry Xu <[email protected]> --------- Signed-off-by: Barry Xu <[email protected]>
1 parent b007204 commit 3df73f0

File tree

12 files changed

+1413
-398
lines changed

12 files changed

+1413
-398
lines changed

rclcpp/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ set(${PROJECT_NAME}_SRCS
4545
src/rclcpp/clock.cpp
4646
src/rclcpp/context.cpp
4747
src/rclcpp/contexts/default_context.cpp
48+
src/rclcpp/create_generic_client.cpp
4849
src/rclcpp/detail/add_guard_condition_to_rcl_wait_set.cpp
4950
src/rclcpp/detail/resolve_intra_process_buffer_type.cpp
5051
src/rclcpp/detail/resolve_parameter_overrides.cpp
@@ -74,6 +75,7 @@ set(${PROJECT_NAME}_SRCS
7475
src/rclcpp/experimental/executors/events_executor/events_executor.cpp
7576
src/rclcpp/experimental/timers_manager.cpp
7677
src/rclcpp/future_return_code.cpp
78+
src/rclcpp/generic_client.cpp
7779
src/rclcpp/generic_publisher.cpp
7880
src/rclcpp/generic_subscription.cpp
7981
src/rclcpp/graph_listener.cpp

rclcpp/include/rclcpp/client.hpp

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,29 @@ struct FutureAndRequestId
115115
/// Destructor.
116116
~FutureAndRequestId() = default;
117117
};
118+
119+
template<typename PendingRequestsT, typename AllocatorT = std::allocator<int64_t>>
120+
size_t
121+
prune_requests_older_than_impl(
122+
PendingRequestsT & pending_requests,
123+
std::mutex & pending_requests_mutex,
124+
std::chrono::time_point<std::chrono::system_clock> time_point,
125+
std::vector<int64_t, AllocatorT> * pruned_requests = nullptr)
126+
{
127+
std::lock_guard guard(pending_requests_mutex);
128+
auto old_size = pending_requests.size();
129+
for (auto it = pending_requests.begin(), last = pending_requests.end(); it != last; ) {
130+
if (it->second.first < time_point) {
131+
if (pruned_requests) {
132+
pruned_requests->push_back(it->first);
133+
}
134+
it = pending_requests.erase(it);
135+
} else {
136+
++it;
137+
}
138+
}
139+
return old_size - pending_requests.size();
140+
}
118141
} // namespace detail
119142

120143
namespace node_interfaces
@@ -771,19 +794,11 @@ class Client : public ClientBase
771794
std::chrono::time_point<std::chrono::system_clock> time_point,
772795
std::vector<int64_t, AllocatorT> * pruned_requests = nullptr)
773796
{
774-
std::lock_guard guard(pending_requests_mutex_);
775-
auto old_size = pending_requests_.size();
776-
for (auto it = pending_requests_.begin(), last = pending_requests_.end(); it != last; ) {
777-
if (it->second.first < time_point) {
778-
if (pruned_requests) {
779-
pruned_requests->push_back(it->first);
780-
}
781-
it = pending_requests_.erase(it);
782-
} else {
783-
++it;
784-
}
785-
}
786-
return old_size - pending_requests_.size();
797+
return detail::prune_requests_older_than_impl(
798+
pending_requests_,
799+
pending_requests_mutex_,
800+
time_point,
801+
pruned_requests);
787802
}
788803

789804
/// Configure client introspection.
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// Copyright 2023 Sony Group Corporation.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef RCLCPP__CREATE_GENERIC_CLIENT_HPP_
16+
#define RCLCPP__CREATE_GENERIC_CLIENT_HPP_
17+
18+
#include <memory>
19+
#include <string>
20+
21+
#include "rclcpp/generic_client.hpp"
22+
#include "rclcpp/node_interfaces/get_node_base_interface.hpp"
23+
#include "rclcpp/node_interfaces/get_node_graph_interface.hpp"
24+
#include "rclcpp/node_interfaces/get_node_services_interface.hpp"
25+
#include "rclcpp/node_interfaces/node_base_interface.hpp"
26+
#include "rclcpp/node_interfaces/node_graph_interface.hpp"
27+
#include "rclcpp/node_interfaces/node_services_interface.hpp"
28+
#include "rclcpp/qos.hpp"
29+
30+
namespace rclcpp
31+
{
32+
/// Create a generic service client with a name of given type.
33+
/**
34+
* \param[in] node_base NodeBaseInterface implementation of the node on which
35+
* to create the client.
36+
* \param[in] node_graph NodeGraphInterface implementation of the node on which
37+
* to create the client.
38+
* \param[in] node_services NodeServicesInterface implementation of the node on
39+
* which to create the client.
40+
* \param[in] service_name The name on which the service is accessible.
41+
* \param[in] service_type The name of service type, e.g. "test_msgs/srv/BasicTypes"
42+
* \param[in] qos Quality of service profile for client.
43+
* \param[in] group Callback group to handle the reply to service calls.
44+
* \return Shared pointer to the created client.
45+
*/
46+
RCLCPP_PUBLIC
47+
rclcpp::GenericClient::SharedPtr
48+
create_generic_client(
49+
std::shared_ptr<node_interfaces::NodeBaseInterface> node_base,
50+
std::shared_ptr<node_interfaces::NodeGraphInterface> node_graph,
51+
std::shared_ptr<node_interfaces::NodeServicesInterface> node_services,
52+
const std::string & service_name,
53+
const std::string & service_type,
54+
const rclcpp::QoS & qos = rclcpp::ServicesQoS(),
55+
rclcpp::CallbackGroup::SharedPtr group = nullptr);
56+
57+
/// Create a generic service client with a name of given type.
58+
/**
59+
* The NodeT type needs to have NodeBaseInterface implementation, NodeGraphInterface implementation
60+
* and NodeServicesInterface implementation of the node which to create the client.
61+
*
62+
* \param[in] node The node on which to create the client.
63+
* \param[in] service_name The name on which the service is accessible.
64+
* \param[in] service_type The name of service type, e.g. "test_msgs/srv/BasicTypes"
65+
* \param[in] qos Quality of service profile for client.
66+
* \param[in] group Callback group to handle the reply to service calls.
67+
* \return Shared pointer to the created client.
68+
*/
69+
template<typename NodeT>
70+
rclcpp::GenericClient::SharedPtr
71+
create_generic_client(
72+
NodeT node,
73+
const std::string & service_name,
74+
const std::string & service_type,
75+
const rclcpp::QoS & qos = rclcpp::ServicesQoS(),
76+
rclcpp::CallbackGroup::SharedPtr group = nullptr)
77+
{
78+
return create_generic_client(
79+
rclcpp::node_interfaces::get_node_base_interface(node),
80+
rclcpp::node_interfaces::get_node_graph_interface(node),
81+
rclcpp::node_interfaces::get_node_services_interface(node),
82+
service_name,
83+
service_type,
84+
qos,
85+
group
86+
);
87+
}
88+
} // namespace rclcpp
89+
90+
#endif // RCLCPP__CREATE_GENERIC_CLIENT_HPP_
Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
// Copyright 2023 Sony Group Corporation.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef RCLCPP__GENERIC_CLIENT_HPP_
16+
#define RCLCPP__GENERIC_CLIENT_HPP_
17+
18+
#include <map>
19+
#include <memory>
20+
#include <future>
21+
#include <string>
22+
#include <vector>
23+
#include <utility>
24+
25+
#include "rcl/client.h"
26+
27+
#include "rclcpp/client.hpp"
28+
#include "rclcpp/visibility_control.hpp"
29+
#include "rcpputils/shared_library.hpp"
30+
31+
#include "rosidl_typesupport_introspection_cpp/message_introspection.hpp"
32+
33+
namespace rclcpp
34+
{
35+
class GenericClient : public ClientBase
36+
{
37+
public:
38+
using Request = void *; // Serialized data pointer of request message
39+
using Response = void *; // Serialized data pointer of response message
40+
41+
using SharedResponse = std::shared_ptr<void>;
42+
43+
using Promise = std::promise<SharedResponse>;
44+
using SharedPromise = std::shared_ptr<Promise>;
45+
46+
using Future = std::future<SharedResponse>;
47+
using SharedFuture = std::shared_future<SharedResponse>;
48+
49+
RCLCPP_SMART_PTR_DEFINITIONS(GenericClient)
50+
51+
/// A convenient GenericClient::Future and request id pair.
52+
/**
53+
* Public members:
54+
* - future: a std::future<void *>.
55+
* - request_id: the request id associated with the future.
56+
*
57+
* All the other methods are equivalent to the ones std::future provides.
58+
*/
59+
struct FutureAndRequestId
60+
: detail::FutureAndRequestId<Future>
61+
{
62+
using detail::FutureAndRequestId<Future>::FutureAndRequestId;
63+
64+
/// See std::future::share().
65+
SharedFuture share() noexcept {return this->future.share();}
66+
67+
/// Move constructor.
68+
FutureAndRequestId(FutureAndRequestId && other) noexcept = default;
69+
/// Deleted copy constructor, each instance is a unique owner of the future.
70+
FutureAndRequestId(const FutureAndRequestId & other) = delete;
71+
/// Move assignment.
72+
FutureAndRequestId & operator=(FutureAndRequestId && other) noexcept = default;
73+
/// Deleted copy assignment, each instance is a unique owner of the future.
74+
FutureAndRequestId & operator=(const FutureAndRequestId & other) = delete;
75+
/// Destructor.
76+
~FutureAndRequestId() = default;
77+
};
78+
79+
GenericClient(
80+
rclcpp::node_interfaces::NodeBaseInterface * node_base,
81+
rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph,
82+
const std::string & service_name,
83+
const std::string & service_type,
84+
rcl_client_options_t & client_options);
85+
86+
RCLCPP_PUBLIC
87+
SharedResponse
88+
create_response() override;
89+
90+
RCLCPP_PUBLIC
91+
std::shared_ptr<rmw_request_id_t>
92+
create_request_header() override;
93+
94+
RCLCPP_PUBLIC
95+
void
96+
handle_response(
97+
std::shared_ptr<rmw_request_id_t> request_header,
98+
std::shared_ptr<void> response) override;
99+
100+
/// Send a request to the service server.
101+
/**
102+
* This method returns a `FutureAndRequestId` instance
103+
* that can be passed to Executor::spin_until_future_complete() to
104+
* wait until it has been completed.
105+
*
106+
* If the future never completes,
107+
* e.g. the call to Executor::spin_until_future_complete() times out,
108+
* GenericClient::remove_pending_request() must be called to clean the client internal state.
109+
* Not doing so will make the `Client` instance to use more memory each time a response is not
110+
* received from the service server.
111+
*
112+
* ```cpp
113+
* auto future = client->async_send_request(my_request);
114+
* if (
115+
* rclcpp::FutureReturnCode::TIMEOUT ==
116+
* executor->spin_until_future_complete(future, timeout))
117+
* {
118+
* client->remove_pending_request(future);
119+
* // handle timeout
120+
* } else {
121+
* handle_response(future.get());
122+
* }
123+
* ```
124+
*
125+
* \param[in] request request to be send.
126+
* \return a FutureAndRequestId instance.
127+
*/
128+
RCLCPP_PUBLIC
129+
FutureAndRequestId
130+
async_send_request(const Request request);
131+
132+
/// Clean all pending requests older than a time_point.
133+
/**
134+
* \param[in] time_point Requests that were sent before this point are going to be removed.
135+
* \param[inout] pruned_requests Removed requests id will be pushed to the vector
136+
* if a pointer is provided.
137+
* \return number of pending requests that were removed.
138+
*/
139+
template<typename AllocatorT = std::allocator<int64_t>>
140+
size_t
141+
prune_requests_older_than(
142+
std::chrono::time_point<std::chrono::system_clock> time_point,
143+
std::vector<int64_t, AllocatorT> * pruned_requests = nullptr)
144+
{
145+
return detail::prune_requests_older_than_impl(
146+
pending_requests_,
147+
pending_requests_mutex_,
148+
time_point,
149+
pruned_requests);
150+
}
151+
152+
RCLCPP_PUBLIC
153+
size_t
154+
prune_pending_requests();
155+
156+
RCLCPP_PUBLIC
157+
bool
158+
remove_pending_request(
159+
int64_t request_id);
160+
161+
/// Take the next response for this client.
162+
/**
163+
* \sa ClientBase::take_type_erased_response().
164+
*
165+
* \param[out] response_out The reference to a Service Response into
166+
* which the middleware will copy the response being taken.
167+
* \param[out] request_header_out The request header to be filled by the
168+
* middleware when taking, and which can be used to associate the response
169+
* to a specific request.
170+
* \returns true if the response was taken, otherwise false.
171+
* \throws rclcpp::exceptions::RCLError based exceptions if the underlying
172+
* rcl function fail.
173+
*/
174+
RCLCPP_PUBLIC
175+
bool
176+
take_response(Response response_out, rmw_request_id_t & request_header_out)
177+
{
178+
return this->take_type_erased_response(response_out, request_header_out);
179+
}
180+
181+
protected:
182+
using CallbackInfoVariant = std::variant<
183+
std::promise<SharedResponse>>; // Use variant for extension
184+
185+
int64_t
186+
async_send_request_impl(
187+
const Request request,
188+
CallbackInfoVariant value);
189+
190+
std::optional<CallbackInfoVariant>
191+
get_and_erase_pending_request(
192+
int64_t request_number);
193+
194+
RCLCPP_DISABLE_COPY(GenericClient)
195+
196+
std::map<int64_t, std::pair<
197+
std::chrono::time_point<std::chrono::system_clock>,
198+
CallbackInfoVariant>> pending_requests_;
199+
std::mutex pending_requests_mutex_;
200+
201+
private:
202+
std::shared_ptr<rcpputils::SharedLibrary> ts_lib_;
203+
const rosidl_typesupport_introspection_cpp::MessageMembers * response_members_;
204+
};
205+
} // namespace rclcpp
206+
207+
#endif // RCLCPP__GENERIC_CLIENT_HPP_

0 commit comments

Comments
 (0)