Skip to content

Implement a proper graceful shutdown for the UDP tracker #596

@josecelano

Description

@josecelano

Although the UDP server has a method called start_with_graceful_shutdown is actually not a real graceful shutdown. The server can catch the halt signal but it does nothing (it only prints a message).

@da2ce7 has been working on a proper implementation. See #594 (comment)

I'm just reproducing here his example for reference:

impl UdpServer<Stopped>

pub async fn start(self, tracker: Arc<Tracker>, form: RegistrationForm) -> Result<UdpServer<Running>, Error> {
    let (tx_start, rx_start) = tokio::sync::oneshot::channel::<Started>();
    let (tx_halt, rx_halt) = tokio::sync::oneshot::channel::<Halted>();

    let task = self.state.launcher.start(tracker, tx_start, rx_halt);

    let binding = rx_start.await.expect("it should be able to start the service").address;

    form.send(Listing::new(binding, super::check))
        .expect("it should be able to send service registration");

    let running_udp_server: UdpServer<Running> = UdpServer {
        state: Running {
            binding,
            halt_task: tx_halt,
            task,
        },
    };

    trace!("Running UDP Tracker on Socket: {}", running_udp_server.state.binding);

    Ok(running_udp_server)
}

impl Launcher

pub fn start(
    &self,
    tracker: Arc<Tracker>,
    tx_start: oneshot::Sender<Started>,
    rx_halt: oneshot::Receiver<Halted>,
) -> JoinHandle<Launcher> {
    let launcher = Launcher::new(self.bind_to);
    tokio::spawn(async move {
        Udp::run_with_graceful_shutdown(tracker, launcher.bind_to, tx_start, rx_halt).await;
        launcher
    })
}

struct ActiveRequests

struct ActiveRequests {
    rb: LocalRb<Static<AbortHandle, 50>>, // the number of requests we handle at the same time.
}

impl Drop for ActiveRequests {
    fn drop(&mut self) {
        for h in self.rb.pop_iter() {
            if !h.is_finished() {
                h.abort();
            }
        }
    }
}

impl Udp

async fn run_with_graceful_shutdown(
    tracker: Arc<Tracker>,
    bind_to: SocketAddr,
    tx_start: oneshot::Sender<Started>,
    rx_halt: oneshot::Receiver<Halted>,
) {
    let socket = Arc::new(UdpSocket::bind(bind_to).await.expect("Could not bind to {self.socket}."));
    let address = socket.local_addr().expect("Could not get local_addr from {binding}.");
    let halt = shutdown_signal_with_message(rx_halt, format!("Halting Http Service Bound to Socket: {address}"));

    let running = tokio::task::spawn(async move {
        let tracker = tracker.clone();
        let socket = socket.clone();

        let reqs = &mut ActiveRequests::default();

        loop {
            let old = reqs.rb.push_overwrite(
                Self::do_request(Self::receive_request(socket.clone()).await, tracker.clone(), socket.clone()).abort_handle(),
            );

            if let Some(h) = old {
                if !h.is_finished() {
                    h.abort();
                }
            }
        }
    });

    tx_start
        .send(Started { address })
        .expect("the UDP Tracker service should not be dropped");

    let stop = running.abort_handle();

    select! {
        _ = running => {},
        () = halt => {}
    }
    stop.abort();

    task::yield_now().await; // lets allow the other threads to complete.
}

Metadata

Metadata

Assignees

Type

No type

Projects

Status

Done

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions