5
5
//!
6
6
//! [1]: https://github.com/ros2-rust/ros2_rust/blob/master/README.md
7
7
8
+ use std:: future:: Future ;
9
+ use std:: pin:: Pin ;
10
+ use std:: sync:: Arc ;
11
+ use std:: task:: Poll ;
12
+ use std:: time:: Duration ;
13
+
8
14
mod context;
9
15
mod error;
10
16
mod node;
@@ -20,37 +26,70 @@ pub use qos::*;
20
26
pub use wait:: * ;
21
27
22
28
use rcl_bindings:: rcl_context_is_valid;
23
- use std:: time:: Duration ;
29
+
30
+ pub use rcl_bindings:: rmw_request_id_t;
31
+
32
+ use parking_lot:: Mutex ;
24
33
25
34
/// Polls the node for new messages and executes the corresponding callbacks.
26
35
///
27
36
/// See [`WaitSet::wait`] for the meaning of the `timeout` parameter.
28
37
///
29
38
/// This may under some circumstances return
30
- /// [`SubscriptionTakeFailed`][1] when the wait set spuriously wakes up.
39
+ /// [`SubscriptionTakeFailed`][1], [`ClientTakeFailed`][2], [`ServiceTakeFailed`][3] when the wait
40
+ /// set spuriously wakes up.
31
41
/// This can usually be ignored.
32
42
///
33
43
/// [1]: crate::SubscriberErrorCode
44
+ /// [2]: crate::ClientErrorCode
45
+ /// [3]: crate::ServiceErrorCode
34
46
pub fn spin_once ( node : & Node , timeout : Option < Duration > ) -> Result < ( ) , RclrsError > {
35
47
let live_subscriptions = node. live_subscriptions ( ) ;
48
+ let live_clients = node. live_clients ( ) ;
49
+ let live_services = node. live_services ( ) ;
36
50
let ctx = Context {
37
51
handle : node. context . clone ( ) ,
38
52
} ;
39
- let mut wait_set = WaitSet :: new ( live_subscriptions. len ( ) , & ctx) ?;
53
+ let mut wait_set = WaitSet :: new (
54
+ live_subscriptions. len ( ) ,
55
+ 0 ,
56
+ 0 ,
57
+ live_clients. len ( ) ,
58
+ live_services. len ( ) ,
59
+ 0 ,
60
+ & ctx,
61
+ ) ?;
40
62
41
63
for live_subscription in & live_subscriptions {
42
64
wait_set. add_subscription ( live_subscription. clone ( ) ) ?;
43
65
}
44
66
67
+ for live_client in & live_clients {
68
+ wait_set. add_client ( live_client. clone ( ) ) ?;
69
+ }
70
+
71
+ for live_service in & live_services {
72
+ wait_set. add_service ( live_service. clone ( ) ) ?;
73
+ }
74
+
45
75
let ready_entities = wait_set. wait ( timeout) ?;
76
+
46
77
for ready_subscription in ready_entities. subscriptions {
47
78
ready_subscription. execute ( ) ?;
48
79
}
49
80
81
+ for ready_client in ready_entities. clients {
82
+ ready_client. execute ( ) ?;
83
+ }
84
+
85
+ for ready_service in ready_entities. services {
86
+ ready_service. execute ( ) ?;
87
+ }
88
+
50
89
Ok ( ( ) )
51
90
}
52
91
53
- /// Convenience function for calling [`spin_once`] in a loop.
92
+ /// Convenience function for calling [`rclrs:: spin_once`] in a loop.
54
93
///
55
94
/// This function additionally checks that the context is still valid.
56
95
pub fn spin ( node : & Node ) -> Result < ( ) , RclrsError > {
@@ -70,6 +109,28 @@ pub fn spin(node: &Node) -> Result<(), RclrsError> {
70
109
} ;
71
110
}
72
111
}
73
-
74
112
Ok ( ( ) )
75
113
}
114
+
115
+ pub fn spin_until_future_complete < T : Unpin + Clone > (
116
+ node : & node:: Node ,
117
+ mut future : Arc < Mutex < Box < future:: RclFuture < T > > > > ,
118
+ ) -> Result < <future:: RclFuture < T > as Future >:: Output , RclReturnCode > {
119
+ let mut cx = future:: create_rcl_waker_context ( ) ;
120
+
121
+ loop {
122
+ let context_valid = unsafe { rcl_context_is_valid ( & mut * node. context . lock ( ) ) } ;
123
+ if context_valid {
124
+ if let Some ( error) = spin_once ( node, None ) . err ( ) {
125
+ match error {
126
+ RclReturnCode :: Timeout => continue ,
127
+ error => return Err ( error) ,
128
+ } ;
129
+ } ;
130
+ match Future :: poll ( Pin :: new ( & mut * future. lock ( ) ) , & mut cx) {
131
+ Poll :: Ready ( val) => break Ok ( val) ,
132
+ Poll :: Pending => continue ,
133
+ } ;
134
+ }
135
+ }
136
+ }
0 commit comments