Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions crates/core/src/client/client_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,9 @@ impl ClientConnection {

pub async fn subscribe(&self, subscription: Subscribe) -> Result<(), DBError> {
let me = self.clone();
self.module
.threadpool()
.spawn_task(move || me.module.subscriptions().add_subscriber(me.sender, subscription))
tokio::task::spawn_blocking(move || me.module.subscriptions().add_subscriber(me.sender, subscription))
.await
.unwrap()
}

pub async fn one_off_query(&self, query: &str, message_id: &[u8]) -> Result<(), anyhow::Error> {
Expand Down
12 changes: 0 additions & 12 deletions crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::oneshot;

use super::module_host::{Catalog, EntityDef, EventStatus, ModuleHost, NoSuchModule, UpdateDatabaseResult};
use super::scheduler::SchedulerStarter;
Expand Down Expand Up @@ -82,17 +81,6 @@ impl HostThreadpool {
pub fn spawn(&self, f: impl FnOnce() + Send + 'static) {
self.inner.spawn(f)
}

pub async fn spawn_task<R: Send + 'static>(&self, f: impl FnOnce() -> R + Send + 'static) -> R {
let (tx, rx) = oneshot::channel();
self.inner.spawn(|| {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(f));
if let Err(Err(_panic)) = tx.send(result) {
tracing::warn!("uncaught panic on threadpool")
}
});
rx.await.unwrap().unwrap_or_else(|err| std::panic::resume_unwind(err))
}
}

#[derive(PartialEq, Eq, Hash, Copy, Clone, Serialize, Debug)]
Expand Down
17 changes: 6 additions & 11 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::time::Duration;
use base64::{engine::general_purpose::STANDARD as BASE_64_STD, Engine as _};
use futures::{Future, FutureExt};
use indexmap::IndexMap;
use tokio::sync::oneshot;

use super::host_controller::HostThreadpool;
use super::{ArgsTuple, InvalidReducerArguments, ReducerArgs, ReducerCallResult, ReducerId, Timestamp};
Expand Down Expand Up @@ -325,7 +326,6 @@ trait DynModuleHost: Send + Sync + 'static {
fn start(&self);
fn exit(&self) -> Closed<'_>;
fn exited(&self) -> Closed<'_>;
fn threadpool(&self) -> &HostThreadpool;
}

struct HostControllerActor<T: Module> {
Expand Down Expand Up @@ -413,10 +413,6 @@ impl<T: Module> DynModuleHost for HostControllerActor<T> {
fn exited(&self) -> Closed<'_> {
self.instance_pool.closed()
}

fn threadpool(&self) -> &HostThreadpool {
&self.threadpool
}
}

pub struct WeakModuleHost {
Expand Down Expand Up @@ -490,19 +486,18 @@ impl ModuleHost {
&self.info.subscriptions
}

pub fn threadpool(&self) -> &HostThreadpool {
self.inner.threadpool()
}

async fn call<F, R>(&self, _reducer_name: &str, f: F) -> Result<R, NoSuchModule>
where
F: FnOnce(&mut dyn ModuleInstance) -> R + Send + 'static,
R: Send + 'static,
{
let (threadpool, mut inst) = self.inner.get_instance(self.info.address).await?;

let result = threadpool.spawn_task(move || f(&mut *inst)).await;
Ok(result)
let (tx, rx) = oneshot::channel();
threadpool.spawn(move || {
let _ = tx.send(f(&mut *inst));
});
Ok(rx.await.expect("instance panicked"))
}

pub async fn disconnect_client(&self, client_id: ClientActorId) {
Expand Down
7 changes: 3 additions & 4 deletions crates/core/src/subscription/module_subscription_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,24 +39,23 @@ impl ModuleSubscriptions {

/// Add a subscriber to the module. NOTE: this function is blocking.
pub fn add_subscriber(&self, sender: ClientConnectionSender, subscription: Subscribe) -> Result<(), DBError> {
let tx = scopeguard::guard(self.relational_db.begin_tx(), |tx| {
let tx = &mut *scopeguard::guard(self.relational_db.begin_tx(), |tx| {
let ctx = ExecutionContext::subscribe(self.relational_db.address());
self.relational_db.release_tx(&ctx, tx);
});

let auth = AuthCtx::new(self.owner_identity, sender.id.identity);
let mut queries = QuerySet::new();
for sql in subscription.query_strings {
let qset = compile_read_only_query(&self.relational_db, &tx, &auth, &sql)?;
let qset = compile_read_only_query(&self.relational_db, tx, &auth, &sql)?;
queries.extend(qset);
}

let database_update = queries.eval(&self.relational_db, &tx, auth)?;
let database_update = tokio::task::block_in_place(|| queries.eval(&self.relational_db, tx, auth))?;
// It acquires the subscription lock after `eval`, allowing `add_subscription` to run concurrently.
// This also makes it possible for `broadcast_event` to get scheduled before the subsequent part here
// but that should not pose an issue.
let mut subscriptions = self.subscriptions.write();
drop(tx);
self._remove_subscriber(sender.id, &mut subscriptions);
let subscription = match subscriptions.iter_mut().find(|s| s.queries == queries) {
Some(sub) => {
Expand Down