Skip to content

Commit cc36a2e

Browse files
perf: read DataKey instead of recomputing it in subscriptions (#276)
Fixes #259 (1) Updates MemTable to use RelValue instead of ProductValue (2) Adds a DataKey member to DataRef and RelValue (3) Subscriptions compute DataKey only when not present on row
1 parent 8f23b6a commit cc36a2e

File tree

14 files changed

+176
-48
lines changed

14 files changed

+176
-48
lines changed

crates/client-api/src/routes/database.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -557,7 +557,7 @@ pub async fn sql(
557557
.into_iter()
558558
.map(|result| StmtResultJson {
559559
schema: result.head.ty(),
560-
rows: result.data.into_iter().map(|x| x.elements).collect::<Vec<_>>(),
560+
rows: result.data.into_iter().map(|x| x.data.elements).collect::<Vec<_>>(),
561561
})
562562
.collect::<Vec<_>>();
563563

crates/core/src/db/datastore/locking_tx_datastore/mod.rs

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -92,19 +92,29 @@ impl traits::Data for Data {
9292
}
9393
}
9494

95+
/// A `DataRef` represents a row stored in a table.
96+
///
97+
/// A table row always has a [`DataKey`] associated with it.
98+
/// This is in contrast to rows that are materialized during query execution
99+
/// which may or may not have an associated `DataKey`.
95100
#[derive(Clone)]
96101
pub struct DataRef {
102+
id: DataKey,
97103
data: ProductValue,
98104
}
99105

100106
impl DataRef {
101-
fn new(data: ProductValue) -> Self {
102-
Self { data }
107+
fn new(id: DataKey, data: ProductValue) -> Self {
108+
Self { id, data }
103109
}
104110

105111
pub fn view(&self) -> &ProductValue {
106112
&self.data
107113
}
114+
115+
pub fn id(&self) -> &DataKey {
116+
&self.id
117+
}
108118
}
109119

110120
pub struct MutTxId {
@@ -1371,7 +1381,7 @@ impl Inner {
13711381
match self.tx_state.as_ref().unwrap().get_row_op(table_id, row_id) {
13721382
RowState::Committed(_) => unreachable!("a row cannot be committed in a tx state"),
13731383
RowState::Insert(row) => {
1374-
return Ok(Some(DataRef::new(row)));
1384+
return Ok(Some(DataRef::new(row_id.0, row)));
13751385
}
13761386
RowState::Delete => {
13771387
return Ok(None);
@@ -1383,7 +1393,7 @@ impl Inner {
13831393
.tables
13841394
.get(table_id)
13851395
.and_then(|table| table.get_row(row_id))
1386-
.map(|row| DataRef::new(row.clone())))
1396+
.map(|row| DataRef::new(row_id.0, row.clone())))
13871397
}
13881398

13891399
fn get_row_type(&self, table_id: &TableId) -> Option<&ProductType> {
@@ -1741,10 +1751,10 @@ impl Iterator for Iter<'_> {
17411751
Some(RowState::Insert(_)) => (), // Do nothing, we'll get it in the next stage
17421752
Some(RowState::Delete) => (), // Skip it, it's been deleted
17431753
Some(RowState::Absent) => {
1744-
return Some(DataRef::new(row.clone()));
1754+
return Some(DataRef::new(row_id.0, row.clone()));
17451755
}
17461756
None => {
1747-
return Some(DataRef::new(row.clone()));
1757+
return Some(DataRef::new(row_id.0, row.clone()));
17481758
}
17491759
}
17501760
}
@@ -1762,8 +1772,8 @@ impl Iterator for Iter<'_> {
17621772
}
17631773
}
17641774
ScanStage::CurrentTx { iter } => {
1765-
if let Some((_, row)) = iter.next() {
1766-
return Some(DataRef::new(row.clone()));
1775+
if let Some((id, row)) = iter.next() {
1776+
return Some(DataRef::new(id.0, row.clone()));
17671777
}
17681778
break;
17691779
}
@@ -1854,6 +1864,7 @@ impl Iterator for IndexSeekIterInner<'_> {
18541864
fn next(&mut self) -> Option<Self::Item> {
18551865
if let Some(row_id) = self.inserted_rows.next() {
18561866
return Some(DataRef::new(
1867+
row_id.0,
18571868
self.tx_state.get_row(&self.table_id, &row_id).unwrap().clone(),
18581869
));
18591870
}
@@ -1902,7 +1913,10 @@ impl Iterator for CommittedIndexIterByColEq<'_> {
19021913
/// Retrieve a commited row. Panics if `table_id` and `row_id` do not identify an actually
19031914
/// present row.
19041915
fn get_committed_row(state: &CommittedState, table_id: &TableId, row_id: &RowId) -> DataRef {
1905-
DataRef::new(state.tables.get(table_id).unwrap().get_row(row_id).unwrap().clone())
1916+
DataRef::new(
1917+
row_id.0,
1918+
state.tables.get(table_id).unwrap().get_row(row_id).unwrap().clone(),
1919+
)
19061920
}
19071921

19081922
pub enum IterByColRange<'a, R: RangeBounds<AlgebraicValue>> {

crates/core/src/host/instance_env.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,7 @@ impl InstanceEnv {
427427
_ => unreachable!("query should always return a table"),
428428
};
429429
Ok(std::iter::once(bsatn::to_vec(&row_type))
430-
.chain(results.data.into_iter().map(|row| bsatn::to_vec(&row)))
430+
.chain(results.data.into_iter().map(|row| bsatn::to_vec(&row.data)))
431431
.map(|bytes| bytes.expect("encoding algebraic values should never fail")))
432432
}
433433
}

crates/core/src/sql/execute.rs

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,10 @@ pub(crate) mod tests {
103103
use crate::db::relational_db::tests_utils::make_test_db;
104104
use crate::db::relational_db::{ST_TABLES_ID, ST_TABLES_NAME};
105105
use crate::vm::tests::create_table_with_rows;
106+
use itertools::Itertools;
106107
use spacetimedb_lib::auth::{StAccess, StTableType};
107108
use spacetimedb_lib::error::ResultTest;
108-
use spacetimedb_lib::relation::Header;
109+
use spacetimedb_lib::relation::{Header, RelValue};
109110
use spacetimedb_sats::{product, AlgebraicType, BuiltinType, ProductType};
110111
use spacetimedb_vm::dsl::{mem_table, scalar};
111112
use spacetimedb_vm::eval::create_game_data;
@@ -357,9 +358,27 @@ pub(crate) mod tests {
357358
let (db, _tmp_dir) = make_test_db()?;
358359

359360
let mut tx = db.begin_tx();
360-
create_table_with_rows(&db, &mut tx, "Inventory", data.inv.head.into(), &data.inv.data)?;
361-
create_table_with_rows(&db, &mut tx, "Player", data.player.head.into(), &data.player.data)?;
362-
create_table_with_rows(&db, &mut tx, "Location", data.location.head.into(), &data.location.data)?;
361+
create_table_with_rows(
362+
&db,
363+
&mut tx,
364+
"Inventory",
365+
data.inv.head.into(),
366+
&data.inv.data.iter().map(|row| row.data.clone()).collect_vec(),
367+
)?;
368+
create_table_with_rows(
369+
&db,
370+
&mut tx,
371+
"Player",
372+
data.player.head.into(),
373+
&data.player.data.iter().map(|row| row.data.clone()).collect_vec(),
374+
)?;
375+
create_table_with_rows(
376+
&db,
377+
&mut tx,
378+
"Location",
379+
data.location.head.into(),
380+
&data.location.data.iter().map(|row| row.data.clone()).collect_vec(),
381+
)?;
363382

364383
let result = &run_for_testing(
365384
&db,
@@ -427,7 +446,7 @@ pub(crate) mod tests {
427446
let mut result = result.first().unwrap().clone();
428447

429448
let row = product!(scalar(2u64), scalar("test"));
430-
input.data.push(row);
449+
input.data.push(RelValue::new(&input.head, &row, None));
431450
input.data.sort();
432451
result.data.sort();
433452

@@ -509,7 +528,7 @@ pub(crate) mod tests {
509528

510529
let mut change = input;
511530
change.data.clear();
512-
change.data.push(row);
531+
change.data.push(RelValue::new(&change.head, &row, None));
513532

514533
assert_eq!(
515534
change.as_without_table_name(),
@@ -526,7 +545,7 @@ pub(crate) mod tests {
526545
.map(|x| {
527546
x.data
528547
.into_iter()
529-
.map(|x| x.field_as_str(1, None).unwrap().to_string())
548+
.map(|x| x.data.field_as_str(1, None).unwrap().to_string())
530549
.collect::<Vec<_>>()
531550
})
532551
.collect();

crates/core/src/subscription/query.rs

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ use crate::sql::compiler::compile_sql;
66
use crate::sql::execute::execute_single_sql;
77
use crate::subscription::subscription::QuerySet;
88
use spacetimedb_lib::identity::AuthCtx;
9-
use spacetimedb_lib::relation::{Column, FieldName, MemTable};
9+
use spacetimedb_lib::relation::{Column, FieldName, MemTable, RelValue};
10+
use spacetimedb_lib::DataKey;
1011
use spacetimedb_sats::AlgebraicType;
1112
use spacetimedb_vm::expr::{Crud, CrudExpr, DbType, QueryExpr, SourceExpr};
1213

@@ -53,7 +54,8 @@ pub fn to_mem_table(of: QueryExpr, data: &DatabaseTableUpdate) -> QueryExpr {
5354
t.data.extend(data.ops.iter().map(|row| {
5455
let mut new = row.row.clone();
5556
new.elements[pos] = row.op_type.into();
56-
new
57+
let mut bytes: &[u8] = row.row_pk.as_ref();
58+
RelValue::new(&t.head, &new, Some(DataKey::decode(&mut bytes).unwrap()))
5759
}));
5860
} else {
5961
t.head.fields.push(Column::new(
@@ -63,7 +65,9 @@ pub fn to_mem_table(of: QueryExpr, data: &DatabaseTableUpdate) -> QueryExpr {
6365
for row in &data.ops {
6466
let mut new = row.row.clone();
6567
new.elements.push(row.op_type.into());
66-
t.data.push(new);
68+
let mut bytes: &[u8] = row.row_pk.as_ref();
69+
t.data
70+
.push(RelValue::new(&t.head, &new, Some(DataKey::decode(&mut bytes).unwrap())));
6771
}
6872
}
6973

@@ -305,6 +309,46 @@ mod tests {
305309
Ok(())
306310
}
307311

312+
#[test]
313+
fn test_eval_incr_maintains_row_ids() -> ResultTest<()> {
314+
let (db, _) = make_test_db()?;
315+
let mut tx = db.begin_tx();
316+
317+
let schema = ProductType::from_iter([("u8", BuiltinType::U8)]);
318+
let row = product!(1u8);
319+
320+
// generate row id from row
321+
let id1 = &row.to_data_key().to_bytes();
322+
323+
// create table empty table "test"
324+
let table_id = create_table_with_rows(&db, &mut tx, "test", schema.clone(), &[])?;
325+
326+
// select * from test
327+
let query = QueryExpr::new(db_table(schema.clone(), "test", table_id));
328+
let query = QuerySet(vec![Query { queries: vec![query] }]);
329+
330+
let op = TableOp {
331+
op_type: 0,
332+
row_pk: id1.clone(),
333+
row: row.clone(),
334+
};
335+
336+
let update = DatabaseTableUpdate {
337+
table_id,
338+
table_name: "test".into(),
339+
ops: vec![op],
340+
};
341+
342+
let update = DatabaseUpdate { tables: vec![update] };
343+
344+
let result = query.eval_incr(&db, &mut tx, &update, AuthCtx::for_testing())?;
345+
let id2 = &result.tables[0].ops[0].row_pk;
346+
347+
// check that both row ids are the same
348+
assert_eq!(id1, id2);
349+
Ok(())
350+
}
351+
308352
#[test]
309353
fn test_subscribe() -> ResultTest<()> {
310354
let (db, _tmp_dir) = make_test_db()?;

crates/core/src/subscription/subscription.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use spacetimedb_lib::auth::{StAccess, StTableType};
22
use spacetimedb_lib::identity::AuthCtx;
3+
use spacetimedb_lib::relation::RelValue;
4+
use spacetimedb_lib::PrimaryKey;
35
use spacetimedb_sats::{AlgebraicValue, BuiltinValue};
46
use spacetimedb_vm::expr::QueryExpr;
57
use std::collections::HashSet;
@@ -50,6 +52,15 @@ impl Subscription {
5052
}
5153
}
5254

55+
// If a RelValue has an id (DataKey) return it directly, otherwise we must construct it from the
56+
// row itself which can be an expensive operation.
57+
fn pk_for_row(row: &RelValue) -> PrimaryKey {
58+
match row.id {
59+
Some(data_key) => PrimaryKey { data_key },
60+
None => RelationalDB::pk_for_row(&row.data),
61+
}
62+
}
63+
5364
impl QuerySet {
5465
/// Queries all the [`StTableType::User`] tables *right now*
5566
/// and turns them into [`QueryExpr`],
@@ -103,14 +114,14 @@ impl QuerySet {
103114
//Hack: remove the hidden field OP_TYPE_FIELD_NAME. see `to_mem_table`
104115
// Needs to be done before calculating the PK.
105116
let op_type = if let AlgebraicValue::Builtin(BuiltinValue::U8(op)) =
106-
row.elements.remove(pos_op_type)
117+
row.data.elements.remove(pos_op_type)
107118
{
108119
op
109120
} else {
110121
panic!("Fail to extract `{OP_TYPE_FIELD_NAME}` on `{}`", result.head.table_name)
111122
};
112123

113-
let row_pk = RelationalDB::pk_for_row(&row);
124+
let row_pk = pk_for_row(&row);
114125

115126
//Skip rows that are already resolved in a previous subscription...
116127
if seen.contains(&(table.table_id, row_pk)) {
@@ -120,6 +131,7 @@ impl QuerySet {
120131
seen.insert((table.table_id, row_pk));
121132

122133
let row_pk = row_pk.to_bytes();
134+
let row = row.data;
123135
table_row_operations.ops.push(TableOp { op_type, row_pk, row });
124136
}
125137
output.tables.push(table_row_operations);
@@ -156,7 +168,7 @@ impl QuerySet {
156168
let mut table_row_operations = Vec::new();
157169

158170
for row in table.data {
159-
let row_pk = RelationalDB::pk_for_row(&row);
171+
let row_pk = pk_for_row(&row);
160172

161173
//Skip rows that are already resolved in a previous subscription...
162174
if seen.contains(&(t.table_id, row_pk)) {
@@ -165,6 +177,7 @@ impl QuerySet {
165177
seen.insert((t.table_id, row_pk));
166178

167179
let row_pk = row_pk.to_bytes();
180+
let row = row.data;
168181
table_row_operations.push(TableOp {
169182
op_type: 1, // Insert
170183
row_pk,

crates/core/src/vm.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate::db::datastore::locking_tx_datastore::MutTxId;
44
use crate::db::datastore::traits::{ColumnDef, IndexDef, IndexId, SequenceId, TableDef};
55
use crate::db::relational_db::RelationalDB;
66
use crate::error::DBError;
7+
use itertools::Itertools;
78
use spacetimedb_lib::auth::{StAccess, StTableType};
89
use spacetimedb_lib::identity::AuthCtx;
910
use spacetimedb_lib::relation::{FieldExpr, Relation};
@@ -177,15 +178,19 @@ impl<'db, 'tx> DbProgram<'db, 'tx> {
177178
let result = self._eval_query(query)?;
178179

179180
match result {
180-
Code::Table(result) => self._execute_delete(&table, result.data),
181+
Code::Table(result) => {
182+
self._execute_delete(&table, result.data.into_iter().map(|row| row.data).collect_vec())
183+
}
181184
_ => Ok(result),
182185
}
183186
}
184187

185188
fn insert_query(&mut self, table: &Table, query: QueryCode) -> Result<Code, ErrorVm> {
186189
let result = self._eval_query(query)?;
187190
match result {
188-
Code::Table(result) => self._execute_insert(table, result.data),
191+
Code::Table(result) => {
192+
self._execute_insert(table, result.data.into_iter().map(|row| row.data).collect_vec())
193+
}
189194
_ => Ok(result),
190195
}
191196
}
@@ -282,9 +287,12 @@ impl ProgramVm for DbProgram<'_, '_> {
282287
Code::Table(result) => result,
283288
_ => return Ok(result),
284289
};
285-
self._execute_delete(&table, deleted.data.clone())?;
290+
self._execute_delete(
291+
&table,
292+
deleted.data.clone().into_iter().map(|row| row.data).collect_vec(),
293+
)?;
286294

287-
let to_insert = mem_table(table.head(), deleted.data);
295+
let to_insert = mem_table(table.head(), deleted.data.into_iter().map(|row| row.data));
288296
insert.table = Table::MemTable(to_insert);
289297

290298
let result = self.insert_query(&table, insert)?;
@@ -335,7 +343,7 @@ impl RelOps for TableCursor<'_> {
335343
#[tracing::instrument(skip_all)]
336344
fn next(&mut self) -> Result<Option<RelValue>, ErrorVm> {
337345
if let Some(row) = self.iter.next() {
338-
return Ok(Some(RelValue::new(self.head(), row.view())));
346+
return Ok(Some(RelValue::new(self.head(), row.view(), Some(*row.id()))));
339347
};
340348
Ok(None)
341349
}
@@ -362,7 +370,7 @@ where
362370
#[tracing::instrument(skip_all)]
363371
fn next(&mut self) -> Result<Option<RelValue>, ErrorVm> {
364372
if let Some(row) = self.iter.next() {
365-
return Ok(Some(RelValue::new(self.head(), &row)));
373+
return Ok(Some(RelValue::new(self.head(), &row, None)));
366374
};
367375
Ok(None)
368376
}

crates/lib/src/primary_key.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@ use std::fmt;
33
use crate::buffer::{BufReader, BufWriter, DecodeError};
44
use crate::DataKey;
55

6+
// TODO(280): Remove PrimaryKey.
7+
// PrimaryKey is a wrapper for DataKey which identifies each row in the database.
8+
// It is not a column and therefore does correspond to a traditional primary key
9+
// in a relational database.
610
#[derive(Copy, Clone, PartialEq, Eq, Hash)]
711
pub struct PrimaryKey {
812
pub data_key: DataKey,

0 commit comments

Comments
 (0)