@@ -21,16 +21,26 @@ impl<'a> DeadlineQueue<'a> {
2121 Self { queue : VecDeque :: with_capacity ( capacity) }
2222 }
2323
24+ /// All calls to [`Instant::now`] go through this wrapper method.
25+ /// This makes it easier to find all places that read the current time.
26+ fn now ( & self ) -> Instant {
27+ Instant :: now ( )
28+ }
29+
2430 pub ( crate ) fn push ( & mut self , id : TestId , test : & ' a CollectedTest ) {
25- let deadline = Instant :: now ( ) + Duration :: from_secs ( TEST_WARN_TIMEOUT_S ) ;
31+ let deadline = self . now ( ) + Duration :: from_secs ( TEST_WARN_TIMEOUT_S ) ;
32+ if let Some ( back) = self . queue . back ( ) {
33+ assert ! ( back. deadline <= deadline) ;
34+ }
2635 self . queue . push_back ( DeadlineEntry { id, test, deadline } ) ;
2736 }
2837
29- /// Equivalent to `rx.read ()`, except that if any test exceeds its deadline
38+ /// Equivalent to `rx.recv ()`, except that if a test exceeds its deadline
3039 /// during the wait, the given callback will also be called for that test.
3140 pub ( crate ) fn read_channel_while_checking_deadlines < T > (
3241 & mut self ,
3342 rx : & mpsc:: Receiver < T > ,
43+ is_running : impl Fn ( TestId ) -> bool ,
3444 mut on_deadline_passed : impl FnMut ( TestId , & CollectedTest ) ,
3545 ) -> Result < T , RecvError > {
3646 loop {
@@ -39,18 +49,18 @@ impl<'a> DeadlineQueue<'a> {
3949 // deadline, so do a normal receive.
4050 return rx. recv ( ) ;
4151 } ;
42- let wait_duration = next_deadline. saturating_duration_since ( Instant :: now ( ) ) ;
52+ let next_deadline_timeout = next_deadline. saturating_duration_since ( self . now ( ) ) ;
53+
54+ let recv_result = rx. recv_timeout ( next_deadline_timeout) ;
55+ // Process deadlines after every receive attempt, regardless of
56+ // outcome, so that we don't build up an unbounded backlog of stale
57+ // entries due to a constant stream of tests finishing.
58+ self . for_each_entry_past_deadline ( & is_running, & mut on_deadline_passed) ;
4359
44- let recv_result = rx. recv_timeout ( wait_duration) ;
4560 match recv_result {
4661 Ok ( value) => return Ok ( value) ,
47- Err ( RecvTimeoutError :: Timeout ) => {
48- // Notify the callback of tests that have exceeded their
49- // deadline, then loop and do annother channel read.
50- for DeadlineEntry { id, test, .. } in self . remove_tests_past_deadline ( ) {
51- on_deadline_passed ( id, test) ;
52- }
53- }
62+ // Deadlines have already been processed, so loop and do another receive.
63+ Err ( RecvTimeoutError :: Timeout ) => { }
5464 Err ( RecvTimeoutError :: Disconnected ) => return Err ( RecvError ) ,
5565 }
5666 }
@@ -60,14 +70,28 @@ impl<'a> DeadlineQueue<'a> {
6070 Some ( self . queue . front ( ) ?. deadline )
6171 }
6272
63- fn remove_tests_past_deadline ( & mut self ) -> Vec < DeadlineEntry < ' a > > {
64- let now = Instant :: now ( ) ;
65- let mut timed_out = vec ! [ ] ;
66- while let Some ( deadline_entry) = pop_front_if ( & mut self . queue , |entry| now < entry. deadline )
67- {
68- timed_out. push ( deadline_entry) ;
73+ fn for_each_entry_past_deadline (
74+ & mut self ,
75+ is_running : impl Fn ( TestId ) -> bool ,
76+ mut on_deadline_passed : impl FnMut ( TestId , & CollectedTest ) ,
77+ ) {
78+ let now = self . now ( ) ;
79+
80+ // Clear out entries that are past their deadline, but only invoke the
81+ // callback for tests that are still considered running.
82+ while let Some ( entry) = pop_front_if ( & mut self . queue , |entry| entry. deadline <= now) {
83+ if is_running ( entry. id ) {
84+ on_deadline_passed ( entry. id , entry. test ) ;
85+ }
86+ }
87+
88+ // Also clear out any leading entries that are no longer running, even
89+ // if their deadline hasn't been reached.
90+ while let Some ( _) = pop_front_if ( & mut self . queue , |entry| !is_running ( entry. id ) ) { }
91+
92+ if let Some ( front) = self . queue . front ( ) {
93+ assert ! ( now < front. deadline) ;
6994 }
70- timed_out
7195 }
7296}
7397
0 commit comments