@@ -24,7 +24,7 @@ use std::sync::Arc;
2424use aquatic_udp_protocol:: Response ;
2525use derive_more:: Constructor ;
2626use log:: { debug, error, info, trace} ;
27- use ringbuf:: traits:: { Consumer , Observer , RingBuffer } ;
27+ use ringbuf:: traits:: { Consumer , Observer , Producer } ;
2828use ringbuf:: StaticRb ;
2929use tokio:: net:: UdpSocket ;
3030use tokio:: sync:: oneshot;
@@ -202,11 +202,23 @@ impl Launcher {
202202 }
203203}
204204
205- #[ derive( Default ) ]
206205struct ActiveRequests {
207206 rb : StaticRb < AbortHandle , 50 > , // the number of requests we handle at the same time.
208207}
209208
209+ impl ActiveRequests {
210+ /// Creates a new [`ActiveRequests`] filled with finished tasks.
211+ async fn new ( ) -> Self {
212+ let mut rb = StaticRb :: default ( ) ;
213+
214+ let ( ) = while rb. try_push ( tokio:: task:: spawn_blocking ( || ( ) ) . abort_handle ( ) ) . is_ok ( ) { } ;
215+
216+ task:: yield_now ( ) . await ;
217+
218+ Self { rb }
219+ }
220+ }
221+
210222impl std:: fmt:: Debug for ActiveRequests {
211223 fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
212224 let ( left, right) = & self . rb . as_slices ( ) ;
@@ -280,15 +292,22 @@ impl Udp {
280292 let tracker = tracker. clone ( ) ;
281293 let socket = socket. clone ( ) ;
282294
283- let reqs = & mut ActiveRequests :: default ( ) ;
295+ let reqs = & mut ActiveRequests :: new ( ) . await ;
284296
285- // Main Waiting Loop, awaits on async [`receive_request`].
286297 loop {
287- if let Some ( h) = reqs. rb . push_overwrite (
288- Self :: spawn_request_processor ( Self :: receive_request ( socket. clone ( ) ) . await , tracker. clone ( ) , socket. clone ( ) )
289- . abort_handle ( ) ,
290- ) {
291- if !h. is_finished ( ) {
298+ task:: yield_now ( ) . await ;
299+ for h in reqs. rb . iter_mut ( ) {
300+ if h. is_finished ( ) {
301+ std:: mem:: swap (
302+ h,
303+ & mut Self :: spawn_request_processor (
304+ Self :: receive_request ( socket. clone ( ) ) . await ,
305+ tracker. clone ( ) ,
306+ socket. clone ( ) ,
307+ )
308+ . abort_handle ( ) ,
309+ ) ;
310+ } else {
292311 // the task is still running, lets yield and give it a chance to flush.
293312 tokio:: task:: yield_now ( ) . await ;
294313
@@ -299,6 +318,9 @@ impl Udp {
299318 tracing:: span!(
300319 target: "UDP TRACKER" ,
301320 tracing:: Level :: WARN , "request-aborted" , server_socket_addr = %server_socket_addr) ;
321+
322+ // force-break a single thread, then loop again.
323+ break ;
302324 }
303325 }
304326 }
@@ -396,13 +418,46 @@ mod tests {
396418 use std:: sync:: Arc ;
397419 use std:: time:: Duration ;
398420
399- use tokio :: time :: sleep ;
421+ use ringbuf :: traits :: { Consumer , Observer , RingBuffer } ;
400422 use torrust_tracker_test_helpers:: configuration:: ephemeral_mode_public;
401423
424+ use super :: ActiveRequests ;
402425 use crate :: bootstrap:: app:: initialize_with_configuration;
403426 use crate :: servers:: registar:: Registar ;
404427 use crate :: servers:: udp:: server:: { Launcher , UdpServer } ;
405428
429+ #[ tokio:: test]
430+ async fn it_should_return_to_the_start_of_the_ring_buffer ( ) {
431+ let mut a_req = ActiveRequests :: new ( ) . await ;
432+
433+ tokio:: time:: sleep ( Duration :: from_millis ( 10 ) ) . await ;
434+
435+ let mut count: usize = 0 ;
436+ let cap: usize = a_req. rb . capacity ( ) . into ( ) ;
437+
438+ // Add a single pending task to check that the ring-buffer is looping correctly.
439+ a_req
440+ . rb
441+ . push_overwrite ( tokio:: task:: spawn ( std:: future:: pending :: < ( ) > ( ) ) . abort_handle ( ) ) ;
442+
443+ count += 1 ;
444+
445+ for _ in 0 ..2 {
446+ for h in a_req. rb . iter ( ) {
447+ let first = count % cap;
448+ println ! ( "{count},{first},{}" , h. is_finished( ) ) ;
449+
450+ if first == 0 {
451+ assert ! ( !h. is_finished( ) ) ;
452+ } else {
453+ assert ! ( h. is_finished( ) ) ;
454+ }
455+
456+ count += 1 ;
457+ }
458+ }
459+ }
460+
406461 #[ tokio:: test]
407462 async fn it_should_be_able_to_start_and_stop ( ) {
408463 let cfg = Arc :: new ( ephemeral_mode_public ( ) ) ;
@@ -423,7 +478,7 @@ mod tests {
423478 . expect ( "it should start the server" ) ;
424479 let stopped = started. stop ( ) . await . expect ( "it should stop the server" ) ;
425480
426- sleep ( Duration :: from_secs ( 1 ) ) . await ;
481+ tokio :: time :: sleep ( Duration :: from_secs ( 1 ) ) . await ;
427482
428483 assert_eq ! ( stopped. state. launcher. bind_to, bind_to) ;
429484 }
0 commit comments