Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@ impl TxDatastore for Locking {
tx.table_id_from_name(table_name, self.database_address)
}

fn table_name_from_id_tx<'a>(&'a self, tx: &'a Self::Tx, table_id: TableId) -> Result<Option<Cow<'a, str>>> {
Ok(tx.table_exists(&table_id).map(Cow::Borrowed))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is okay - the mut version does a db scan but this seems simpler but maybe it can be out of sync with the system table? idk, but this is fine for now.

}

fn schema_for_table_tx<'tx>(&self, tx: &'tx Self::Tx, table_id: TableId) -> Result<Cow<'tx, TableSchema>> {
tx.schema_for_table(&ExecutionContext::internal(self.database_address), table_id)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub(crate) trait StateView {

fn iter<'a>(&'a self, ctx: &'a ExecutionContext, table_id: &TableId) -> Result<Iter<'a>>;

// TODO(noa): rename to table_name, and TableId doesn't need to be a reference
fn table_exists(&self, table_id: &TableId) -> Option<&str>;

/// Returns an iterator,
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/db/datastore/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ pub trait TxDatastore: DataRow + Tx {

fn table_id_exists_tx(&self, tx: &Self::Tx, table_id: &TableId) -> bool;
fn table_id_from_name_tx(&self, tx: &Self::Tx, table_name: &str) -> Result<Option<TableId>>;
fn table_name_from_id_tx<'a>(&'a self, tx: &'a Self::Tx, table_id: TableId) -> Result<Option<Cow<'a, str>>>;
fn schema_for_table_tx<'tx>(&self, tx: &'tx Self::Tx, table_id: TableId) -> super::Result<Cow<'tx, TableSchema>>;
fn get_all_tables_tx<'tx>(
&self,
Expand Down
20 changes: 12 additions & 8 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,10 +386,9 @@ impl RelationalDB {
/// TODO(jgilles): when we support actual read-only transactions, use those here instead.
/// TODO(jgilles, kim): get this merged with the above function (two people had similar ideas
/// at the same time)
pub fn with_read_only<F, A, E>(&self, ctx: &ExecutionContext, f: F) -> Result<A, E>
pub fn with_read_only<F, T>(&self, ctx: &ExecutionContext, f: F) -> T
where
F: FnOnce(&mut Tx) -> Result<A, E>,
E: From<DBError>,
F: FnOnce(&mut Tx) -> T,
{
let mut tx = self.inner.begin_tx();
let res = f(&mut tx);
Expand Down Expand Up @@ -470,7 +469,7 @@ impl RelationalDB {
.with_label_values(&table_id.0)
.start_timer();
let table_name = self
.table_name_from_id(ctx, tx, table_id)?
.table_name_from_id_mut(ctx, tx, table_id)?
.map(|name| name.to_string())
.unwrap_or_default();
self.inner.drop_table_mut_tx(tx, table_id).map(|_| {
Expand Down Expand Up @@ -512,7 +511,12 @@ impl RelationalDB {
}

#[tracing::instrument(skip_all)]
pub fn table_name_from_id<'a>(
pub fn table_name_from_id<'a>(&'a self, tx: &'a Tx, table_id: TableId) -> Result<Option<Cow<'a, str>>, DBError> {
self.inner.table_name_from_id_tx(tx, table_id)
}

#[tracing::instrument(skip_all)]
pub fn table_name_from_id_mut<'a>(
&'a self,
ctx: &'a ExecutionContext,
tx: &'a MutTx,
Expand Down Expand Up @@ -1058,10 +1062,10 @@ mod tests {

let ctx = ExecutionContext::default();

let result = stdb.table_name_from_id(&ctx, &tx, table_id)?;
let result = stdb.table_name_from_id_mut(&ctx, &tx, table_id)?;
assert!(
result.is_none(),
"Table should not exist, so table_name_from_id should return none",
"Table should not exist, so table_name_from_id_mut should return none",
);
Ok(())
}
Expand Down Expand Up @@ -1477,7 +1481,7 @@ mod tests {

let table_id = stdb.create_table(&mut tx, schema)?;
stdb.rename_table(&mut tx, table_id, "YourTable")?;
let table_name = stdb.table_name_from_id(&ctx, &tx, table_id)?;
let table_name = stdb.table_name_from_id_mut(&ctx, &tx, table_id)?;

assert_eq!(Some("YourTable"), table_name.as_ref().map(Cow::as_ref));
// Also make sure we've removed the old ST_TABLES_ID row
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/host/instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl InstanceEnv {
value: _,
})) => {}
_ => {
let res = stdb.table_name_from_id(ctx, tx, table_id);
let res = stdb.table_name_from_id_mut(ctx, tx, table_id);
if let Ok(Some(table_name)) = res {
log::debug!("insert(table: {table_name}, table_id: {table_id}): {e}")
} else {
Expand Down
49 changes: 16 additions & 33 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::fmt;
use std::sync::{Arc, Weak};
Expand Down Expand Up @@ -46,45 +45,29 @@ impl DatabaseUpdate {

pub fn from_writes(stdb: &RelationalDB, tx_data: &TxData) -> Self {
let mut map: HashMap<TableId, Vec<TableOp>> = HashMap::new();
//TODO: This should be wrapped with .auto_commit
let tx = stdb.begin_mut_tx();
for record in tx_data.records.iter() {
let op = match record.op {
TxOp::Delete => 0,
TxOp::Insert(_) => 1,
};

let vec = if let Some(vec) = map.get_mut(&record.table_id) {
vec
} else {
map.insert(record.table_id, Vec::new());
map.get_mut(&record.table_id).unwrap()
};

let (row, row_pk) = (record.product_value.clone(), record.key.to_bytes());
let vec = map.entry(record.table_id).or_default();

vec.push(TableOp {
op_type: op,
row_pk,
row,
op_type: match record.op {
TxOp::Delete => 0,
TxOp::Insert(_) => 1,
},
row_pk: record.key.to_bytes(),
row: record.product_value.clone(),
});
}

let ctx = ExecutionContext::internal(stdb.address());
let mut table_name_map: HashMap<TableId, Cow<'_, str>> = HashMap::new();
let mut table_updates = Vec::new();
for (table_id, table_row_operations) in map.drain() {
let table_name = table_name_map
.entry(table_id)
.or_insert_with(|| stdb.table_name_from_id(&ctx, &tx, table_id).unwrap().unwrap());
let table_name: &str = table_name.as_ref();
table_updates.push(DatabaseTableUpdate {
table_id,
table_name: table_name.to_owned(),
ops: table_row_operations,
});
}
stdb.rollback_mut_tx(&ctx, tx);
let table_updates = stdb.with_read_only(&ctx, |tx| {
map.into_iter()
.map(|(table_id, ops)| DatabaseTableUpdate {
table_id,
table_name: stdb.table_name_from_id(tx, table_id).unwrap().unwrap().into_owned(),
ops,
})
.collect()
});

DatabaseUpdate { tables: table_updates }
}
Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/subscription/module_subscription_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ impl ModuleSubscriptions {
.with_label_values(&self.relational_db.address())
.inc();

let sender = subscription.subscribers().last().unwrap();
let sender = subscription.subscribers().last().unwrap().clone();
drop(subscriptions);
// NOTE: It is important to send the state in this thread because if you spawn a new
// thread it's possible for messages to get sent to the client out of order. If you do
// spawn in another thread messages will need to be buffered until the state is sent out
Expand Down