From 1ac2c8d99cdaa3929e94470f974ffb608300abf5 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Sat, 24 Feb 2024 00:57:12 +0530 Subject: [PATCH 1/2] serialise disconnect logic --- crates/client-api/src/routes/subscribe.rs | 7 ++++++- crates/core/src/host/module_host.rs | 11 +---------- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index 6370dc23f75..0243276b047 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -306,7 +306,12 @@ async fn ws_client_actor(client: ClientConnection, mut ws: WebSocketStream, mut // https://rust-lang.github.io/wg-async/vision/submitted_stories/status_quo/aws_engineer/solving_a_deadlock.html handle_queue.clear(); - client.module.disconnect_client(client.id).await; + // ignore NoSuchModule; if the module's already closed, that's fine + let _ = client.module.subscriptions().remove_subscriber(client.id); + let _ = client + .module + .call_identity_connected_disconnected(client.id.identity, client.id.address, false) + .await; } enum ClientMessage { diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index d667d325cd5..fda94e870dd 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -10,7 +10,7 @@ use tokio::sync::oneshot; use super::host_controller::HostThreadpool; use super::{ArgsTuple, InvalidReducerArguments, ReducerArgs, ReducerCallResult, ReducerId, Timestamp}; -use crate::client::{ClientActorId, ClientConnectionSender}; +use crate::client::ClientConnectionSender; use crate::database_logger::LogLevel; use crate::db::datastore::traits::{TxData, TxOp}; use crate::db::relational_db::RelationalDB; @@ -500,15 +500,6 @@ impl ModuleHost { Ok(rx.await.expect("instance panicked")) } - pub async fn disconnect_client(&self, client_id: ClientActorId) { - tokio::join!( - async { self.subscriptions().remove_subscriber(client_id) }, - self.call_identity_connected_disconnected(client_id.identity, client_id.address, false) - // ignore NoSuchModule; if the module's already closed, that's fine - .map(drop) - ); - } - pub async fn call_identity_connected_disconnected( &self, caller_identity: Identity, From cfd89422aa66cc86ad2065eb5c800f6815c87503 Mon Sep 17 00:00:00 2001 From: Noa Date: Mon, 26 Feb 2024 12:48:01 -0600 Subject: [PATCH 2/2] Keep the ModuleHost::disconnect_client method --- crates/client-api/src/routes/subscribe.rs | 7 +------ crates/core/src/host/module_host.rs | 10 +++++++++- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index 0243276b047..6370dc23f75 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -306,12 +306,7 @@ async fn ws_client_actor(client: ClientConnection, mut ws: WebSocketStream, mut // https://rust-lang.github.io/wg-async/vision/submitted_stories/status_quo/aws_engineer/solving_a_deadlock.html handle_queue.clear(); - // ignore NoSuchModule; if the module's already closed, that's fine - let _ = client.module.subscriptions().remove_subscriber(client.id); - let _ = client - .module - .call_identity_connected_disconnected(client.id.identity, client.id.address, false) - .await; + client.module.disconnect_client(client.id).await; } enum ClientMessage { diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index fda94e870dd..64215344748 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -10,7 +10,7 @@ use tokio::sync::oneshot; use super::host_controller::HostThreadpool; use super::{ArgsTuple, InvalidReducerArguments, ReducerArgs, ReducerCallResult, ReducerId, Timestamp}; -use crate::client::ClientConnectionSender; +use crate::client::{ClientActorId, ClientConnectionSender}; use crate::database_logger::LogLevel; use crate::db::datastore::traits::{TxData, TxOp}; use crate::db::relational_db::RelationalDB; @@ -500,6 +500,14 @@ impl ModuleHost { Ok(rx.await.expect("instance panicked")) } + pub async fn disconnect_client(&self, client_id: ClientActorId) { + self.subscriptions().remove_subscriber(client_id); + // ignore NoSuchModule; if the module's already closed, that's fine + let _ = self + .call_identity_connected_disconnected(client_id.identity, client_id.address, false) + .await; + } + pub async fn call_identity_connected_disconnected( &self, caller_identity: Identity,