Skip to content

Commit 2df0cb5

Browse files
committed
Make Table::clone_structure cheaper by:
- Arcing `TableSchema`, and this has benefits elsewhere too. - Arc<[_]>ing the visitor program instructions. The data behind the Arcs very rarely change, which is the perfect case for an Arc.
1 parent b9cee3d commit 2df0cb5

File tree

20 files changed

+275
-254
lines changed

20 files changed

+275
-254
lines changed

crates/bench/benches/special.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,9 @@ fn serialize_benchmarks<T: BenchTable + RandomTable>(c: &mut Criterion) {
9090
});
9191

9292
let mut table = spacetimedb_table::table::Table::new(
93-
TableDef::from_product(name, T::product_type().clone()).into_schema(TableId(0)),
93+
TableDef::from_product(name, T::product_type().clone())
94+
.into_schema(TableId(0))
95+
.into(),
9496
spacetimedb_table::indexes::SquashedOffset::COMMITTED_STATE,
9597
);
9698
let mut blob_store = spacetimedb_table::blob_store::HashMapBlobStore::default();

crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs

Lines changed: 18 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ pub(crate) struct CommittedState {
5252
}
5353

5454
impl StateView for CommittedState {
55-
fn get_schema(&self, table_id: &TableId) -> Option<&TableSchema> {
55+
fn get_schema(&self, table_id: &TableId) -> Option<&Arc<TableSchema>> {
5656
self.tables.get(table_id).map(|table| table.get_schema())
5757
}
5858
fn iter<'a>(&'a self, ctx: &'a ExecutionContext, table_id: &TableId) -> Result<Iter<'a>> {
@@ -62,11 +62,7 @@ impl StateView for CommittedState {
6262
Err(TableError::IdNotFound(SystemTable::st_table, table_id.0).into())
6363
}
6464
fn table_exists(&self, table_id: &TableId) -> Option<&str> {
65-
if let Some(table) = self.tables.get(table_id) {
66-
Some(&table.schema.table_name)
67-
} else {
68-
None
69-
}
65+
self.tables.get(table_id).map(|t| &*t.schema.table_name)
7066
}
7167
/// Returns an iterator,
7268
/// yielding every row in the table identified by `table_id`,
@@ -118,7 +114,7 @@ impl CommittedState {
118114
};
119115

120116
// Insert the table row into st_tables, creating st_tables if it's missing
121-
let (st_tables, blob_store) = self.get_table_and_blob_store_or_create(ST_TABLES_ID, st_table_schema());
117+
let (st_tables, blob_store) = self.get_table_and_blob_store_or_create(ST_TABLES_ID, st_table_schema().into());
122118
// Insert the table row into `st_tables` for all system tables
123119
for schema in system_tables() {
124120
let table_id = schema.table_id;
@@ -140,7 +136,8 @@ impl CommittedState {
140136
}
141137

142138
// Insert the columns into `st_columns`
143-
let (st_columns, blob_store) = self.get_table_and_blob_store_or_create(ST_COLUMNS_ID, st_columns_schema());
139+
let (st_columns, blob_store) =
140+
self.get_table_and_blob_store_or_create(ST_COLUMNS_ID, st_columns_schema().into());
144141
for col in system_tables().into_iter().flat_map(|x| x.into_columns()) {
145142
let row = StColumnRow {
146143
table_id: col.table_id,
@@ -160,7 +157,7 @@ impl CommittedState {
160157

161158
// Insert constraints into `st_constraints`
162159
let (st_constraints, blob_store) =
163-
self.get_table_and_blob_store_or_create(ST_CONSTRAINTS_ID, st_constraints_schema());
160+
self.get_table_and_blob_store_or_create(ST_CONSTRAINTS_ID, st_constraints_schema().into());
164161
for (i, constraint) in system_tables()
165162
.into_iter()
166163
.flat_map(|x| x.constraints)
@@ -185,7 +182,8 @@ impl CommittedState {
185182
}
186183

187184
// Insert the indexes into `st_indexes`
188-
let (st_indexes, blob_store) = self.get_table_and_blob_store_or_create(ST_INDEXES_ID, st_indexes_schema());
185+
let (st_indexes, blob_store) =
186+
self.get_table_and_blob_store_or_create(ST_INDEXES_ID, st_indexes_schema().into());
189187
for (i, index) in system_tables()
190188
.into_iter()
191189
.flat_map(|x| x.indexes)
@@ -216,7 +214,7 @@ impl CommittedState {
216214

217215
// Insert the sequences into `st_sequences`
218216
let (st_sequences, blob_store) =
219-
self.get_table_and_blob_store_or_create(ST_SEQUENCES_ID, st_sequences_schema());
217+
self.get_table_and_blob_store_or_create(ST_SEQUENCES_ID, st_sequences_schema().into());
220218
// We create sequences last to get right the starting number
221219
// so, we don't sort here
222220
for (i, col) in system_tables().into_iter().flat_map(|x| x.sequences).enumerate() {
@@ -245,8 +243,8 @@ impl CommittedState {
245243
// Re-read the schema with the correct ids...
246244
let ctx = ExecutionContext::internal(database_address);
247245
for schema in system_tables() {
248-
*self.tables.get_mut(&schema.table_id).unwrap().schema =
249-
self.schema_for_table_raw(&ctx, schema.table_id)?;
246+
self.tables.get_mut(&schema.table_id).unwrap().schema =
247+
Arc::new(self.schema_for_table_raw(&ctx, schema.table_id)?);
250248
}
251249

252250
Ok(())
@@ -266,7 +264,7 @@ impl CommittedState {
266264
Ok(())
267265
}
268266

269-
pub fn replay_insert(&mut self, table_id: TableId, schema: &TableSchema, row: &ProductValue) -> Result<()> {
267+
pub fn replay_insert(&mut self, table_id: TableId, schema: &Arc<TableSchema>, row: &ProductValue) -> Result<()> {
270268
let (table, blob_store) = self.get_table_and_blob_store_or_create_ref_schema(table_id, schema);
271269
table.insert_internal(blob_store, row).map_err(TableError::Insert)?;
272270
Ok(())
@@ -332,9 +330,7 @@ impl CommittedState {
332330

333331
// Construct their schemas and insert tables for them.
334332
for table_id in table_ids {
335-
let schema = self
336-
.schema_for_table(&ExecutionContext::default(), table_id)?
337-
.into_owned();
333+
let schema = self.schema_for_table(&ExecutionContext::default(), table_id)?;
338334
self.tables
339335
.insert(table_id, Table::new(schema, SquashedOffset::COMMITTED_STATE));
340336
}
@@ -453,11 +449,8 @@ impl CommittedState {
453449
// and the fullness of the page.
454450

455451
for (table_id, mut tx_table) in insert_tables {
456-
let (commit_table, commit_blob_store) = self.get_table_and_blob_store_or_create(
457-
table_id,
458-
// TODO(perf): avoid cloning here.
459-
*tx_table.schema.clone(),
460-
);
452+
let (commit_table, commit_blob_store) =
453+
self.get_table_and_blob_store_or_create(table_id, tx_table.schema.clone());
461454

462455
// NOTE: if there is a schema change the table id will not change
463456
// and that is what is important here so it doesn't matter if we
@@ -527,13 +520,13 @@ impl CommittedState {
527520

528521
fn create_table(&mut self, table_id: TableId, schema: TableSchema) {
529522
self.tables
530-
.insert(table_id, Table::new(schema, SquashedOffset::COMMITTED_STATE));
523+
.insert(table_id, Table::new(Arc::new(schema), SquashedOffset::COMMITTED_STATE));
531524
}
532525

533526
pub fn get_table_and_blob_store_or_create_ref_schema<'this>(
534527
&'this mut self,
535528
table_id: TableId,
536-
schema: &'_ TableSchema,
529+
schema: &Arc<TableSchema>,
537530
) -> (&'this mut Table, &'this mut dyn BlobStore) {
538531
let table = self
539532
.tables
@@ -546,7 +539,7 @@ impl CommittedState {
546539
pub fn get_table_and_blob_store_or_create(
547540
&mut self,
548541
table_id: TableId,
549-
schema: TableSchema,
542+
schema: Arc<TableSchema>,
550543
) -> (&mut Table, &mut dyn BlobStore) {
551544
let table = self
552545
.tables

crates/core/src/db/datastore/locking_tx_datastore/datastore.rs

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ use crate::{
1414
Epoch, StModuleFields, StModuleRow, StTableFields, ST_MODULE_ID, ST_TABLES_ID, WASM_MODULE,
1515
},
1616
traits::{
17-
DataRow, IsolationLevel, MutProgrammable, MutTx, MutTxDatastore, Programmable, Tx, TxData, TxDatastore,
17+
DataRow, IsolationLevel, MutProgrammable, MutTx, MutTxDatastore, Programmable, RowTypeForTable, Tx,
18+
TxData, TxDatastore,
1819
},
1920
},
2021
db_metrics::{DB_METRICS, MAX_TX_CPU_TIME},
@@ -28,7 +29,7 @@ use anyhow::anyhow;
2829
use parking_lot::{Mutex, RwLock};
2930
use spacetimedb_primitives::{ColList, ConstraintId, IndexId, SequenceId, TableId};
3031
use spacetimedb_sats::db::def::{IndexDef, SequenceDef, TableDef, TableSchema};
31-
use spacetimedb_sats::{hash::Hash, AlgebraicValue, DataKey, ProductType, ProductValue};
32+
use spacetimedb_sats::{hash::Hash, AlgebraicValue, DataKey, ProductValue};
3233
use spacetimedb_table::{indexes::RowPointer, table::RowRef};
3334
use std::ops::RangeBounds;
3435
use std::sync::Arc;
@@ -150,10 +151,8 @@ impl Locking {
150151
let mut committed_state = self.committed_state.write_arc();
151152
for write in &transaction.writes {
152153
let table_id = TableId(write.set_id);
153-
let schema = committed_state
154-
.schema_for_table(&ExecutionContext::default(), table_id)?
155-
.into_owned();
156-
let table_name = schema.table_name.clone();
154+
let schema = committed_state.schema_for_table(&ExecutionContext::default(), table_id)?;
155+
let table_name = &schema.table_name;
157156
let row_type = schema.get_row_type();
158157

159158
let decode_row = |mut data: &[u8], source: &str| {
@@ -192,7 +191,7 @@ impl Locking {
192191
// and therefore has performance implications and must not be disabled.
193192
DB_METRICS
194193
.rdb_num_table_rows
195-
.with_label_values(&self.database_address, &table_id.into(), &table_name)
194+
.with_label_values(&self.database_address, &table_id.into(), table_name)
196195
.dec();
197196
}
198197
Operation::Insert => {
@@ -208,7 +207,7 @@ impl Locking {
208207
// and therefore has performance implications and must not be disabled.
209208
DB_METRICS
210209
.rdb_num_table_rows
211-
.with_label_values(&self.database_address, &table_id.into(), &table_name)
210+
.with_label_values(&self.database_address, &table_id.into(), table_name)
212211
.inc();
213212
}
214213
}
@@ -290,11 +289,11 @@ impl TxDatastore for Locking {
290289
Ok(tx.table_exists(&table_id).map(Cow::Borrowed))
291290
}
292291

293-
fn schema_for_table_tx<'tx>(&self, tx: &'tx Self::Tx, table_id: TableId) -> Result<Cow<'tx, TableSchema>> {
292+
fn schema_for_table_tx(&self, tx: &Self::Tx, table_id: TableId) -> Result<Arc<TableSchema>> {
294293
tx.schema_for_table(&ExecutionContext::internal(self.database_address), table_id)
295294
}
296295

297-
fn get_all_tables_tx<'tx>(&self, ctx: &ExecutionContext, tx: &'tx Self::Tx) -> Result<Vec<Cow<'tx, TableSchema>>> {
296+
fn get_all_tables_tx(&self, ctx: &ExecutionContext, tx: &Self::Tx) -> Result<Vec<Arc<TableSchema>>> {
298297
self.iter_tx(ctx, tx, ST_TABLES_ID)?
299298
.map(|row_ref| {
300299
let table_id = row_ref.read_col(StTableFields::TableId)?;
@@ -322,15 +321,15 @@ impl MutTxDatastore for Locking {
322321
/// reflect the schema of the table.q
323322
///
324323
/// This function is known to be called quite frequently.
325-
fn row_type_for_table_mut_tx<'tx>(&self, tx: &'tx Self::MutTx, table_id: TableId) -> Result<Cow<'tx, ProductType>> {
324+
fn row_type_for_table_mut_tx<'tx>(&self, tx: &'tx Self::MutTx, table_id: TableId) -> Result<RowTypeForTable<'tx>> {
326325
tx.row_type_for_table(table_id, self.database_address)
327326
}
328327

329328
/// IMPORTANT! This function is relatively expensive, and much more
330329
/// expensive than `row_type_for_table_mut_tx`. Prefer
331330
/// `row_type_for_table_mut_tx` if you only need to access the `ProductType`
332331
/// of the table.
333-
fn schema_for_table_mut_tx<'tx>(&self, tx: &'tx Self::MutTx, table_id: TableId) -> Result<Cow<'tx, TableSchema>> {
332+
fn schema_for_table_mut_tx(&self, tx: &Self::MutTx, table_id: TableId) -> Result<Arc<TableSchema>> {
334333
tx.schema_for_table(&ExecutionContext::internal(self.database_address), table_id)
335334
}
336335

@@ -1220,7 +1219,7 @@ mod tests {
12201219
datastore.commit_mut_tx_for_test(tx)?;
12211220

12221221
let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable);
1223-
let schema = datastore.schema_for_table_mut_tx(&tx, table_id)?.into_owned();
1222+
let schema = datastore.schema_for_table_mut_tx(&tx, table_id)?;
12241223

12251224
for index in &*schema.indexes {
12261225
datastore.drop_index_mut_tx(&mut tx, index.index_id)?;

0 commit comments

Comments
 (0)