Skip to content

Commit 5a23fad

Browse files
perf(553): Optimize index joins for incremental evaluation
Fixes #553. Before this change, we would use the same plan for both query and incremental eval. This is problematic for index joins. In particular, table sizes are drastically different under incremental eval. After this change, joins are reordered for incremental eval.
1 parent 7058a10 commit 5a23fad

File tree

5 files changed

+286
-22
lines changed

5 files changed

+286
-22
lines changed

crates/core/src/sql/compiler.rs

Lines changed: 97 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,11 +295,15 @@ mod tests {
295295
use std::ops::Bound;
296296

297297
use crate::db::relational_db::tests_utils::make_test_db;
298+
use crate::host::module_host::{DatabaseTableUpdate, TableOp};
299+
use crate::subscription::query;
298300
use spacetimedb_lib::error::ResultTest;
299301
use spacetimedb_lib::operator::OpQuery;
300302
use spacetimedb_primitives::TableId;
303+
use spacetimedb_sats::data_key::ToDataKey;
301304
use spacetimedb_sats::db::def::{ColumnDef, IndexDef, TableDef};
302-
use spacetimedb_sats::AlgebraicType;
305+
use spacetimedb_sats::relation::MemTable;
306+
use spacetimedb_sats::{product, AlgebraicType};
303307
use spacetimedb_vm::expr::{IndexJoin, IndexScan, JoinExpr, Query};
304308

305309
fn create_table(
@@ -1025,4 +1029,96 @@ mod tests {
10251029
};
10261030
Ok(())
10271031
}
1032+
1033+
#[test]
1034+
fn compile_incremental_index_join() -> ResultTest<()> {
1035+
let (db, _) = make_test_db()?;
1036+
let mut tx = db.begin_tx();
1037+
1038+
// Create table [lhs] with index on [b]
1039+
let schema = &[("a", AlgebraicType::U64), ("b", AlgebraicType::U64)];
1040+
let indexes = &[(1.into(), "b")];
1041+
let lhs_id = create_table(&db, &mut tx, "lhs", schema, indexes)?;
1042+
1043+
// Create table [rhs] with index on [b, c]
1044+
let schema = &[
1045+
("b", AlgebraicType::U64),
1046+
("c", AlgebraicType::U64),
1047+
("d", AlgebraicType::U64),
1048+
];
1049+
let indexes = &[(0.into(), "b"), (1.into(), "c")];
1050+
let rhs_id = create_table(&db, &mut tx, "rhs", schema, indexes)?;
1051+
1052+
// Should generate an index join since there is an index on `lhs.b`.
1053+
// Should push the sargable range condition into the index join's probe side.
1054+
let sql = "select lhs.* from lhs join rhs on lhs.b = rhs.b where rhs.c > 2 and rhs.c < 4 and rhs.d = 3";
1055+
let exp = compile_sql(&db, &tx, sql)?.remove(0);
1056+
1057+
let CrudExpr::Query(expr) = exp else {
1058+
panic!("unexpected result from compilation: {:#?}", exp);
1059+
};
1060+
1061+
// Create an insert for an incremental update.
1062+
let row = product!(0u64, 0u64);
1063+
let insert = TableOp {
1064+
op_type: 1,
1065+
row_pk: row.to_data_key().to_bytes(),
1066+
row,
1067+
};
1068+
let insert = DatabaseTableUpdate {
1069+
table_id: lhs_id,
1070+
table_name: String::from("lhs"),
1071+
ops: vec![insert],
1072+
};
1073+
1074+
// Optimize the query plan for the incremental update.
1075+
let expr = query::to_mem_table(expr, &insert);
1076+
let expr = expr.optimize();
1077+
1078+
let QueryExpr {
1079+
source:
1080+
SourceExpr::MemTable(MemTable {
1081+
head: Header { table_name, .. },
1082+
..
1083+
}),
1084+
query,
1085+
..
1086+
} = expr
1087+
else {
1088+
panic!("unexpected result after optimization: {:#?}", expr);
1089+
};
1090+
1091+
assert_eq!(table_name, "lhs");
1092+
assert_eq!(query.len(), 1);
1093+
1094+
let Query::IndexJoin(IndexJoin {
1095+
probe_side:
1096+
QueryExpr {
1097+
source: SourceExpr::MemTable(_),
1098+
query: ref lhs,
1099+
},
1100+
probe_field:
1101+
FieldName::Name {
1102+
table: ref probe_table,
1103+
field: ref probe_field,
1104+
},
1105+
index_header: _,
1106+
index_select: Some(_),
1107+
index_table,
1108+
index_col,
1109+
return_index_rows: false,
1110+
}) = query[0]
1111+
else {
1112+
panic!("unexpected operator {:#?}", query[0]);
1113+
};
1114+
1115+
assert!(lhs.is_empty());
1116+
1117+
// Assert that original index and probe tables have been swapped.
1118+
assert_eq!(index_table, rhs_id);
1119+
assert_eq!(index_col, 0.into());
1120+
assert_eq!(probe_field, "b");
1121+
assert_eq!(probe_table, "lhs");
1122+
Ok(())
1123+
}
10281124
}

crates/core/src/subscription/query.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -499,13 +499,14 @@ mod tests {
499499
let indexes = &[(0.into(), "id")];
500500
let lhs_id = create_table(&db, &mut tx, "lhs", schema, indexes)?;
501501

502-
// Create table [rhs] with no indexes
502+
// Create table [rhs] with index on [id]
503503
let schema = &[
504504
("rid", AlgebraicType::I32),
505505
("id", AlgebraicType::I32),
506506
("y", AlgebraicType::I32),
507507
];
508-
let rhs_id = create_table(&db, &mut tx, "rhs", schema, &[])?;
508+
let indexes = &[(1.into(), "id")];
509+
let rhs_id = create_table(&db, &mut tx, "rhs", schema, indexes)?;
509510

510511
// Insert into lhs
511512
for i in 0..5 {

crates/core/src/subscription/subscription.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -527,7 +527,9 @@ impl<'a> IncrementalJoin<'a> {
527527
auth: &AuthCtx,
528528
) -> Result<impl Iterator<Item = Op>, DBError> {
529529
let mut inserts = {
530-
let lhs_virt = query::to_mem_table(self.expr.clone(), &self.lhs.inserts());
530+
// Replan query after replacing left table with virtual table,
531+
// since join order may need to be reversed.
532+
let lhs_virt = query::to_mem_table(self.expr.clone(), &self.lhs.inserts()).optimize();
531533
let rhs_virt = self.to_mem_table_rhs(self.rhs.inserts());
532534

533535
// {A+ join B}
@@ -551,7 +553,9 @@ impl<'a> IncrementalJoin<'a> {
551553
set
552554
};
553555
let mut deletes = {
554-
let lhs_virt = query::to_mem_table(self.expr.clone(), &self.lhs.deletes());
556+
// Replan query after replacing left table with virtual table,
557+
// since join order may need to be reversed.
558+
let lhs_virt = query::to_mem_table(self.expr.clone(), &self.lhs.deletes()).optimize();
555559
let rhs_virt = self.to_mem_table_rhs(self.rhs.deletes());
556560

557561
// {A- join B}

crates/core/src/vm.rs

Lines changed: 64 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,8 @@ use spacetimedb_lib::identity::AuthCtx;
99
use spacetimedb_primitives::{ColId, TableId};
1010
use spacetimedb_sats::db::auth::{StAccess, StTableType};
1111
use spacetimedb_sats::db::def::{ColumnDef, IndexDef, ProductTypeMeta, TableDef};
12-
use spacetimedb_sats::relation::{
13-
DbTable, FieldExpr, FieldName, Header, MemTable, RelIter, RelValue, Relation, RowCount, Table,
14-
};
12+
use spacetimedb_sats::relation::{DbTable, FieldExpr, FieldName, RelValueRef, Relation};
13+
use spacetimedb_sats::relation::{Header, MemTable, RelIter, RelValue, RowCount, Table};
1514
use spacetimedb_sats::{AlgebraicValue, ProductValue};
1615
use spacetimedb_vm::env::EnvDb;
1716
use spacetimedb_vm::errors::ErrorVm;
@@ -53,13 +52,31 @@ pub fn build_query<'a>(
5352
let iter = result.select(move |row| cmp.compare(row, &header));
5453
Box::new(iter)
5554
}
55+
// If this is an index join between two virtual tables, replace with an inner join.
56+
// Such a plan is possible under incremental evaluation,
57+
// when there are updates to both base tables,
58+
// however an index lookup is invalid on a virtual table.
59+
//
60+
// TODO: This logic should be entirely encapsulated within the query planner.
61+
// It should not be possible for the planner to produce an invalid plan.
62+
Query::IndexJoin(join)
63+
if !db_table
64+
&& matches!(join.probe_side.source, SourceExpr::MemTable(_))
65+
&& join.probe_side.source.table_name() != result.head().table_name =>
66+
{
67+
let join: JoinExpr = join.into();
68+
let iter = join_inner(ctx, stdb, tx, result, join, true)?;
69+
Box::new(iter)
70+
}
5671
Query::IndexJoin(IndexJoin {
5772
probe_side,
5873
probe_field,
5974
index_header,
75+
index_select,
6076
index_table,
6177
index_col,
62-
}) if db_table => {
78+
return_index_rows,
79+
}) => {
6380
let probe_side = build_query(ctx, stdb, tx, probe_side.into())?;
6481
Box::new(IndexSemiJoin {
6582
ctx,
@@ -68,16 +85,13 @@ pub fn build_query<'a>(
6885
probe_side,
6986
probe_field,
7087
index_header,
88+
index_select,
7189
index_table,
7290
index_col,
7391
index_iter: None,
92+
return_index_rows,
7493
})
7594
}
76-
Query::IndexJoin(join) => {
77-
let join: JoinExpr = join.into();
78-
let iter = join_inner(ctx, stdb, tx, result, join, true)?;
79-
Box::new(iter)
80-
}
8195
Query::Select(cmp) => {
8296
let header = result.head().clone();
8397
let iter = result.select(move |row| cmp.compare(row, &header));
@@ -189,12 +203,15 @@ pub struct IndexSemiJoin<'a, Rhs: RelOps> {
189203
// The field whose value will be used to probe the index.
190204
pub probe_field: FieldName,
191205
// The header for the index side of the join.
192-
// Also the return header since we are returning values from the index side.
193206
pub index_header: Header,
207+
// An optional predicate to evaluate over the matching rows of the index.
208+
pub index_select: Option<ColumnOp>,
194209
// The table id on which the index is defined.
195210
pub index_table: TableId,
196211
// The column id for which the index is defined.
197212
pub index_col: ColId,
213+
// Is this a left or right semijion?
214+
pub return_index_rows: bool,
198215
// An iterator for the index side.
199216
// A new iterator will be instantiated for each row on the probe side.
200217
pub index_iter: Option<IterByColEq<'a>>,
@@ -206,9 +223,32 @@ pub struct IndexSemiJoin<'a, Rhs: RelOps> {
206223
ctx: &'a ExecutionContext<'a>,
207224
}
208225

226+
impl<'a, Rhs: RelOps> IndexSemiJoin<'a, Rhs> {
227+
fn filter(&self, index_row: RelValueRef) -> Result<bool, ErrorVm> {
228+
if let Some(op) = &self.index_select {
229+
Ok(op.compare(index_row, &self.index_header)?)
230+
} else {
231+
Ok(true)
232+
}
233+
}
234+
235+
fn map(&self, index_row: RelValue, probe_row: Option<RelValue>) -> RelValue {
236+
if let Some(value) = probe_row {
237+
if !self.return_index_rows {
238+
return value;
239+
}
240+
}
241+
index_row
242+
}
243+
}
244+
209245
impl<'a, Rhs: RelOps> RelOps for IndexSemiJoin<'a, Rhs> {
210246
fn head(&self) -> &Header {
211-
&self.index_header
247+
if self.return_index_rows {
248+
&self.index_header
249+
} else {
250+
self.probe_side.head()
251+
}
212252
}
213253

214254
fn row_count(&self) -> RowCount {
@@ -218,8 +258,13 @@ impl<'a, Rhs: RelOps> RelOps for IndexSemiJoin<'a, Rhs> {
218258
#[tracing::instrument(skip_all)]
219259
fn next(&mut self) -> Result<Option<RelValue>, ErrorVm> {
220260
// Return a value from the current index iterator, if not exhausted.
221-
if let Some(value) = self.index_iter.as_mut().and_then(|iter| iter.next()) {
222-
return Ok(Some(value.to_rel_value()));
261+
if self.return_index_rows {
262+
while let Some(value) = self.index_iter.as_mut().and_then(|iter| iter.next()) {
263+
let value = value.to_rel_value();
264+
if self.filter(value.as_val_ref())? {
265+
return Ok(Some(self.map(value, None)));
266+
}
267+
}
223268
}
224269
// Otherwise probe the index with a row from the probe side.
225270
while let Some(row) = self.probe_side.next()? {
@@ -229,9 +274,12 @@ impl<'a, Rhs: RelOps> RelOps for IndexSemiJoin<'a, Rhs> {
229274
let col_id = self.index_col;
230275
let value = value.clone();
231276
let mut index_iter = self.db.iter_by_col_eq(self.ctx, self.tx, table_id, col_id, value)?;
232-
if let Some(value) = index_iter.next() {
233-
self.index_iter = Some(index_iter);
234-
return Ok(Some(value.to_rel_value()));
277+
while let Some(value) = index_iter.next() {
278+
let value = value.to_rel_value();
279+
if self.filter(value.as_val_ref())? {
280+
self.index_iter = Some(index_iter);
281+
return Ok(Some(self.map(value, Some(row))));
282+
}
235283
}
236284
}
237285
}

0 commit comments

Comments
 (0)