diff --git a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs index 249c7322ae2..28b834aa90b 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs @@ -282,6 +282,10 @@ impl TxDatastore for Locking { tx.table_id_from_name(table_name, self.database_address) } + fn table_name_from_id_tx<'a>(&'a self, tx: &'a Self::Tx, table_id: TableId) -> Result>> { + Ok(tx.table_exists(&table_id).map(Cow::Borrowed)) + } + fn schema_for_table_tx<'tx>(&self, tx: &'tx Self::Tx, table_id: TableId) -> Result> { tx.schema_for_table(&ExecutionContext::internal(self.database_address), table_id) } diff --git a/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs b/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs index 407d3a88ec5..8a7a4432081 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs @@ -42,6 +42,7 @@ pub(crate) trait StateView { fn iter<'a>(&'a self, ctx: &'a ExecutionContext, table_id: &TableId) -> Result>; + // TODO(noa): rename to table_name, and TableId doesn't need to be a reference fn table_exists(&self, table_id: &TableId) -> Option<&str>; /// Returns an iterator, diff --git a/crates/core/src/db/datastore/traits.rs b/crates/core/src/db/datastore/traits.rs index 0b49ee40769..1e678bd53d3 100644 --- a/crates/core/src/db/datastore/traits.rs +++ b/crates/core/src/db/datastore/traits.rs @@ -113,6 +113,7 @@ pub trait TxDatastore: DataRow + Tx { fn table_id_exists_tx(&self, tx: &Self::Tx, table_id: &TableId) -> bool; fn table_id_from_name_tx(&self, tx: &Self::Tx, table_name: &str) -> Result>; + fn table_name_from_id_tx<'a>(&'a self, tx: &'a Self::Tx, table_id: TableId) -> Result>>; fn schema_for_table_tx<'tx>(&self, tx: &'tx Self::Tx, table_id: TableId) -> super::Result>; fn get_all_tables_tx<'tx>( &self, diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 2dfdd86c9d6..976071ac0f0 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -386,10 +386,9 @@ impl RelationalDB { /// TODO(jgilles): when we support actual read-only transactions, use those here instead. /// TODO(jgilles, kim): get this merged with the above function (two people had similar ideas /// at the same time) - pub fn with_read_only(&self, ctx: &ExecutionContext, f: F) -> Result + pub fn with_read_only(&self, ctx: &ExecutionContext, f: F) -> T where - F: FnOnce(&mut Tx) -> Result, - E: From, + F: FnOnce(&mut Tx) -> T, { let mut tx = self.inner.begin_tx(); let res = f(&mut tx); @@ -470,7 +469,7 @@ impl RelationalDB { .with_label_values(&table_id.0) .start_timer(); let table_name = self - .table_name_from_id(ctx, tx, table_id)? + .table_name_from_id_mut(ctx, tx, table_id)? .map(|name| name.to_string()) .unwrap_or_default(); self.inner.drop_table_mut_tx(tx, table_id).map(|_| { @@ -512,7 +511,12 @@ impl RelationalDB { } #[tracing::instrument(skip_all)] - pub fn table_name_from_id<'a>( + pub fn table_name_from_id<'a>(&'a self, tx: &'a Tx, table_id: TableId) -> Result>, DBError> { + self.inner.table_name_from_id_tx(tx, table_id) + } + + #[tracing::instrument(skip_all)] + pub fn table_name_from_id_mut<'a>( &'a self, ctx: &'a ExecutionContext, tx: &'a MutTx, @@ -1058,10 +1062,10 @@ mod tests { let ctx = ExecutionContext::default(); - let result = stdb.table_name_from_id(&ctx, &tx, table_id)?; + let result = stdb.table_name_from_id_mut(&ctx, &tx, table_id)?; assert!( result.is_none(), - "Table should not exist, so table_name_from_id should return none", + "Table should not exist, so table_name_from_id_mut should return none", ); Ok(()) } @@ -1477,7 +1481,7 @@ mod tests { let table_id = stdb.create_table(&mut tx, schema)?; stdb.rename_table(&mut tx, table_id, "YourTable")?; - let table_name = stdb.table_name_from_id(&ctx, &tx, table_id)?; + let table_name = stdb.table_name_from_id_mut(&ctx, &tx, table_id)?; assert_eq!(Some("YourTable"), table_name.as_ref().map(Cow::as_ref)); // Also make sure we've removed the old ST_TABLES_ID row diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index 86688cec9fb..a119360097e 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -133,7 +133,7 @@ impl InstanceEnv { value: _, })) => {} _ => { - let res = stdb.table_name_from_id(ctx, tx, table_id); + let res = stdb.table_name_from_id_mut(ctx, tx, table_id); if let Ok(Some(table_name)) = res { log::debug!("insert(table: {table_name}, table_id: {table_id}): {e}") } else { diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index ad060325f1a..d667d325cd5 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -1,4 +1,3 @@ -use std::borrow::Cow; use std::collections::HashMap; use std::fmt; use std::sync::{Arc, Weak}; @@ -46,45 +45,29 @@ impl DatabaseUpdate { pub fn from_writes(stdb: &RelationalDB, tx_data: &TxData) -> Self { let mut map: HashMap> = HashMap::new(); - //TODO: This should be wrapped with .auto_commit - let tx = stdb.begin_mut_tx(); for record in tx_data.records.iter() { - let op = match record.op { - TxOp::Delete => 0, - TxOp::Insert(_) => 1, - }; - - let vec = if let Some(vec) = map.get_mut(&record.table_id) { - vec - } else { - map.insert(record.table_id, Vec::new()); - map.get_mut(&record.table_id).unwrap() - }; - - let (row, row_pk) = (record.product_value.clone(), record.key.to_bytes()); + let vec = map.entry(record.table_id).or_default(); vec.push(TableOp { - op_type: op, - row_pk, - row, + op_type: match record.op { + TxOp::Delete => 0, + TxOp::Insert(_) => 1, + }, + row_pk: record.key.to_bytes(), + row: record.product_value.clone(), }); } let ctx = ExecutionContext::internal(stdb.address()); - let mut table_name_map: HashMap> = HashMap::new(); - let mut table_updates = Vec::new(); - for (table_id, table_row_operations) in map.drain() { - let table_name = table_name_map - .entry(table_id) - .or_insert_with(|| stdb.table_name_from_id(&ctx, &tx, table_id).unwrap().unwrap()); - let table_name: &str = table_name.as_ref(); - table_updates.push(DatabaseTableUpdate { - table_id, - table_name: table_name.to_owned(), - ops: table_row_operations, - }); - } - stdb.rollback_mut_tx(&ctx, tx); + let table_updates = stdb.with_read_only(&ctx, |tx| { + map.into_iter() + .map(|(table_id, ops)| DatabaseTableUpdate { + table_id, + table_name: stdb.table_name_from_id(tx, table_id).unwrap().unwrap().into_owned(), + ops, + }) + .collect() + }); DatabaseUpdate { tables: table_updates } } diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index b42da18a7e2..e87d8bafe41 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -78,7 +78,8 @@ impl ModuleSubscriptions { .with_label_values(&self.relational_db.address()) .inc(); - let sender = subscription.subscribers().last().unwrap(); + let sender = subscription.subscribers().last().unwrap().clone(); + drop(subscriptions); // NOTE: It is important to send the state in this thread because if you spawn a new // thread it's possible for messages to get sent to the client out of order. If you do // spawn in another thread messages will need to be buffered until the state is sent out