Skip to content

Commit 95c5eba

Browse files
perf(623): Use row counts in query optimization
Closes #623. Before this patch query optimization was entirely syntax driven. Now that we keep table size metrics we can be somewhat data driven. This patch improves index joins, by using row counts to determine the index side and the probe side.
1 parent 3d88f58 commit 95c5eba

File tree

5 files changed

+60
-36
lines changed

5 files changed

+60
-36
lines changed

crates/core/src/db/relational_db.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ pub struct RelationalDB {
3636
commit_log: Option<CommitLogMut>,
3737
_lock: Arc<File>,
3838
address: Address,
39+
row_count_fn: Arc<dyn Fn(TableId) -> i64 + Send + Sync>,
3940
}
4041

4142
impl DataRow for RelationalDB {
@@ -139,12 +140,29 @@ impl RelationalDB {
139140
commit_log,
140141
_lock: Arc::new(lock),
141142
address: db_address,
143+
row_count_fn: Arc::new(move |table| {
144+
METRICS
145+
.rdb_num_table_rows
146+
.with_label_values(&db_address, &table.into())
147+
.get()
148+
}),
142149
};
143150

144151
log::info!("[{}] DATABASE: OPENED", address);
145152
Ok(db)
146153
}
147154

155+
/// Returns an approximate row count for a particular table.
156+
pub fn row_count(&self, table: TableId) -> i64 {
157+
(self.row_count_fn)(table)
158+
}
159+
160+
/// Update this `RelationalDB` with an approximate row count function.
161+
pub fn with_row_count(mut self, row_count: Arc<dyn Fn(TableId) -> i64 + Send + Sync>) -> Self {
162+
self.row_count_fn = row_count;
163+
self
164+
}
165+
148166
/// Returns the address for this database
149167
pub fn address(&self) -> Address {
150168
self.address
@@ -654,7 +672,7 @@ pub(crate) mod tests_utils {
654672
let tmp_dir = TempDir::with_prefix("stdb_test")?;
655673
let in_memory = false;
656674
let fsync = false;
657-
let stdb = open_db(&tmp_dir, in_memory, fsync)?;
675+
let stdb = open_db(&tmp_dir, in_memory, fsync)?.with_row_count(Arc::new(|_| i64::MAX));
658676
Ok((stdb, tmp_dir))
659677
}
660678
}

crates/core/src/sql/compiler.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ fn compile_statement(db: &RelationalDB, statement: SqlAst) -> Result<CrudExpr, P
270270
} => compile_drop(name, kind, table_access)?,
271271
};
272272

273-
Ok(q.optimize(Some(db.address())))
273+
Ok(q.optimize(&|table| db.row_count(table)))
274274
}
275275

276276
#[cfg(test)]

crates/core/src/subscription/subscription.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -657,7 +657,7 @@ impl<'a> IncrementalJoin<'a> {
657657
// Replan query after replacing the indexed table with a virtual table,
658658
// since join order may need to be reversed.
659659
let join_a = with_delta_table(self.join.clone(), true, self.index_side.inserts());
660-
let join_a = QueryExpr::from(join_a).optimize(Some(db.address()));
660+
let join_a = QueryExpr::from(join_a).optimize(&|table| db.row_count(table));
661661

662662
// No need to replan after replacing the probe side with a virtual table,
663663
// since no new constraints have been added.
@@ -685,7 +685,7 @@ impl<'a> IncrementalJoin<'a> {
685685
// Replan query after replacing the indexed table with a virtual table,
686686
// since join order may need to be reversed.
687687
let join_a = with_delta_table(self.join.clone(), true, self.index_side.deletes());
688-
let join_a = QueryExpr::from(join_a).optimize(Some(db.address()));
688+
let join_a = QueryExpr::from(join_a).optimize(&|table| db.row_count(table));
689689

690690
// No need to replan after replacing the probe side with a virtual table,
691691
// since no new constraints have been added.
@@ -880,7 +880,7 @@ mod tests {
880880

881881
// Optimize the query plan for the incremental update.
882882
let expr: QueryExpr = with_delta_table(join, true, delta).into();
883-
let mut expr = expr.optimize(Some(db.address()));
883+
let mut expr = expr.optimize(&|_| i64::MAX);
884884

885885
assert_eq!(expr.source.table_name(), "lhs");
886886
assert_eq!(expr.query.len(), 1);
@@ -975,7 +975,7 @@ mod tests {
975975

976976
// Optimize the query plan for the incremental update.
977977
let expr: QueryExpr = with_delta_table(join, false, delta).into();
978-
let mut expr = expr.optimize(Some(db.address()));
978+
let mut expr = expr.optimize(&|_| i64::MAX);
979979

980980
assert_eq!(expr.source.table_name(), "lhs");
981981
assert_eq!(expr.query.len(), 1);

crates/vm/src/eval.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ fn build_typed<P: ProgramVm>(p: &mut P, node: Expr) -> ExprOpt {
7777
}
7878
}
7979
Expr::Crud(q) => {
80-
let q = q.optimize(p.address());
80+
let q = q.optimize(&|_| i64::MAX);
8181
match q {
8282
CrudExpr::Query(q) => {
8383
let source = build_query_opt(q);

crates/vm/src/expr.rs

Lines changed: 35 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::errors::{ErrorKind, ErrorLang, ErrorType, ErrorVm};
99
use crate::functions::{FunDef, Param};
1010
use crate::operator::{Op, OpCmp, OpLogic, OpQuery};
1111
use crate::types::Ty;
12-
use spacetimedb_lib::{Address, Identity};
12+
use spacetimedb_lib::Identity;
1313
use spacetimedb_primitives::*;
1414
use spacetimedb_sats::algebraic_type::AlgebraicType;
1515
use spacetimedb_sats::algebraic_value::AlgebraicValue;
@@ -426,7 +426,7 @@ impl IndexJoin {
426426
// Reorder the index and probe sides of an index join.
427427
// This is necessary if the indexed table has been replaced by a delta table.
428428
// A delta table is a virtual table consisting of changes or updates to a physical table.
429-
pub fn reorder(self) -> Self {
429+
pub fn reorder(self, row_count: impl Fn(TableId) -> i64) -> Self {
430430
// The probe table must be a physical table.
431431
if matches!(self.probe_side.source, SourceExpr::MemTable(_)) {
432432
return self;
@@ -449,26 +449,25 @@ impl IndexJoin {
449449
{
450450
return self;
451451
}
452-
// Returns the `FieldName` for this `ColId`.
453-
// Note there is an unwrap which the compiler will ensure is safe.
454-
// The existence of `col` will already have been verified,
455-
// during construction of the index join.
456-
fn field(table: &MemTable, col: &ColId) -> FieldName {
457-
table
458-
.head()
459-
.fields
460-
.iter()
461-
.find(|Column { col_id, .. }| col_id == col)
462-
.unwrap()
463-
.field
464-
.clone()
465-
}
466452
// For the same reason the compiler also ensures this unwrap is safe.
467453
let probe_column = self.probe_side.source.head().column(&self.probe_field).unwrap().col_id;
468454
match self.index_side {
469-
Table::MemTable(delta) => {
470-
let index_field = field(&delta, &self.index_col);
471-
455+
// If the size of the indexed table is sufficiently large, do not reorder.
456+
Table::DbTable(DbTable { table_id, .. }) if row_count(table_id) > 1000 => self,
457+
// If this is a delta table, we must reorder.
458+
// If this is a sufficiently small physical table, we should reorder.
459+
table => {
460+
// The compiler ensures the following unwrap is safe.
461+
// The existence of the index `ColId` as already been verified,
462+
// during construction of the index join.
463+
let index_field = table
464+
.head()
465+
.fields
466+
.iter()
467+
.find(|col| col.col_id == self.index_col)
468+
.unwrap()
469+
.field
470+
.clone();
472471
// Merge all selections from the original probe side into a single predicate.
473472
// This includes an index scan if present.
474473
let predicate = self.probe_side.query.into_iter().fold(None, |acc, op| {
@@ -483,11 +482,11 @@ impl IndexJoin {
483482
// Push any selections on the index side to the probe side.
484483
let probe_side = if let Some(predicate) = self.index_select {
485484
QueryExpr {
486-
source: delta.into(),
485+
source: table.into(),
487486
query: vec![predicate.into()],
488487
}
489488
} else {
490-
delta.into()
489+
table.into()
491490
};
492491
IndexJoin {
493492
// The new probe side consists of the updated rows.
@@ -506,7 +505,6 @@ impl IndexJoin {
506505
return_index_rows: !self.return_index_rows,
507506
}
508507
}
509-
Table::DbTable(_) => self,
510508
}
511509
}
512510

@@ -624,9 +622,9 @@ pub enum CrudExpr {
624622
}
625623

626624
impl CrudExpr {
627-
pub fn optimize(self, db: Option<Address>) -> Self {
625+
pub fn optimize(self, row_count: &impl Fn(TableId) -> i64) -> Self {
628626
match self {
629-
CrudExpr::Query(x) => CrudExpr::Query(x.optimize(db)),
627+
CrudExpr::Query(x) => CrudExpr::Query(x.optimize(row_count)),
630628
_ => self,
631629
}
632630
}
@@ -1286,7 +1284,8 @@ impl QueryExpr {
12861284
//
12871285
// Ex. SELECT Left.* FROM Left JOIN Right ON Left.id = Right.id ...
12881286
// where `Left` has an index defined on `id`.
1289-
fn try_index_join(mut query: QueryExpr, _db: Option<Address>) -> QueryExpr {
1287+
fn try_index_join(self) -> QueryExpr {
1288+
let mut query = self;
12901289
// We expect 2 and only 2 operations - a join followed by a wildcard projection.
12911290
if query.query.len() != 2 {
12921291
return query;
@@ -1391,7 +1390,7 @@ impl QueryExpr {
13911390
q
13921391
}
13931392

1394-
pub fn optimize(mut self, db: Option<Address>) -> Self {
1393+
pub fn optimize(mut self, row_count: &impl Fn(TableId) -> i64) -> Self {
13951394
let mut q = Self {
13961395
source: self.source.clone(),
13971396
query: Vec::with_capacity(self.query.len()),
@@ -1405,7 +1404,7 @@ impl QueryExpr {
14051404

14061405
if self.query.len() == 1 && matches!(self.query[0], Query::IndexJoin(_)) {
14071406
if let Some(Query::IndexJoin(join)) = self.query.pop() {
1408-
q.query.push(Query::IndexJoin(join.reorder()));
1407+
q.query.push(Query::IndexJoin(join.reorder(row_count)));
14091408
return q;
14101409
}
14111410
}
@@ -1415,11 +1414,18 @@ impl QueryExpr {
14151414
Query::Select(op) => {
14161415
q = Self::optimize_select(q, op, &tables);
14171416
}
1418-
Query::JoinInner(join) => q = q.with_join_inner(join.rhs.optimize(db), join.col_lhs, join.col_rhs),
1417+
Query::JoinInner(join) => {
1418+
q = q.with_join_inner(join.rhs.optimize(row_count), join.col_lhs, join.col_rhs)
1419+
}
14191420
_ => q.query.push(query),
14201421
};
14211422
}
1422-
Self::try_index_join(q, db)
1423+
1424+
let q = q.try_index_join();
1425+
if q.query.len() == 1 && matches!(q.query[0], Query::IndexJoin(_)) {
1426+
return q.optimize(row_count);
1427+
}
1428+
q
14231429
}
14241430
}
14251431

0 commit comments

Comments
 (0)