Skip to content

Commit 6f21b41

Browse files
refactor(compiler): Swap the index and probe sides of an index join
Before this change we would swap the index and probe sides if (1) the index side was replaced with a delta table, (2) the probe side had an index defined on the join field, and (3) the index side was the return side. After this change we swap if (1) the index side has been replaced with a delta table, and (2) the probe side has an index defined on the join field. We no longer place restrictions on the rows that are returned.
1 parent 3b053d3 commit 6f21b41

File tree

1 file changed

+113
-100
lines changed

1 file changed

+113
-100
lines changed

crates/vm/src/expr.rs

Lines changed: 113 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,113 @@ impl From<IndexJoin> for QueryExpr {
422422
}
423423
}
424424

425+
impl IndexJoin {
426+
// Reorder the index and probe sides of an index join.
427+
// This is necessary if the indexed table has been replaced by a delta table.
428+
// A delta table is a virtual table consisting of changes or updates to a physical table.
429+
pub fn reorder(self) -> Self {
430+
// The probe table must be a physical table.
431+
if matches!(self.probe_side.source, SourceExpr::MemTable(_)) {
432+
return self;
433+
}
434+
// It must have an index defined on the join field.
435+
if !self
436+
.probe_side
437+
.source
438+
.head()
439+
.has_constraint(&self.probe_field, Constraints::indexed())
440+
{
441+
return self;
442+
}
443+
// It must be a linear pipeline of selections.
444+
if !self
445+
.probe_side
446+
.query
447+
.iter()
448+
.all(|op| matches!(op, Query::Select(_)) || matches!(op, Query::IndexScan(_)))
449+
{
450+
return self;
451+
}
452+
// Returns the `FieldName` for this `ColId`.
453+
// Note there is an unwrap which the compiler will ensure is safe.
454+
// The existence of `col` will already have been verified,
455+
// during construction of the index join.
456+
fn field(table: &MemTable, col: &ColId) -> FieldName {
457+
table
458+
.head()
459+
.fields
460+
.iter()
461+
.find(|Column { col_id, .. }| col_id == col)
462+
.unwrap()
463+
.field
464+
.clone()
465+
}
466+
// For the same reason the compiler also ensures this unwrap is safe.
467+
let probe_column = self.probe_side.source.head().column(&self.probe_field).unwrap().col_id;
468+
match self.index_side {
469+
Table::MemTable(delta) if self.return_index_rows => {
470+
let index_field = field(&delta, &self.index_col);
471+
472+
// Merge all selections from the original probe side into a single predicate.
473+
// This includes an index scan if present.
474+
let predicate = self.probe_side.query.into_iter().fold(None, |acc, op| {
475+
<Query as Into<Option<ColumnOp>>>::into(op).map(|op| {
476+
if let Some(predicate) = acc {
477+
ColumnOp::new(OpQuery::Logic(OpLogic::And), predicate, op)
478+
} else {
479+
op
480+
}
481+
})
482+
});
483+
IndexJoin {
484+
// The new probe side consists of the updated rows.
485+
probe_side: delta.into(),
486+
// The new probe field is the previous index field.
487+
probe_field: index_field,
488+
// The original probe table is now the table that is being probed.
489+
index_side: self.probe_side.source.into(),
490+
// Any selections from the original probe side are pulled above the index lookup.
491+
index_select: predicate,
492+
// The new index field is the previous probe field.
493+
index_col: probe_column,
494+
// Because we have swapped the original index and probe sides of the join,
495+
// the new index join needs to return rows from the probe side instead of the index side.
496+
return_index_rows: false,
497+
}
498+
}
499+
Table::MemTable(delta) => {
500+
let index_field = field(&delta, &self.index_col);
501+
502+
let probe_side = if let Some(predicate) = self.index_select {
503+
QueryExpr {
504+
source: delta.into(),
505+
query: vec![predicate.into()],
506+
}
507+
} else {
508+
delta.into()
509+
};
510+
IndexJoin {
511+
// The new probe side consists of the updated rows.
512+
// Plus any selections from the original index probe.
513+
probe_side,
514+
// The new probe field is the previous index field.
515+
probe_field: index_field,
516+
// The original probe table is now the table that is being probed.
517+
index_side: self.probe_side.source.into(),
518+
// Any selections from the index probe side are pushed to the probe side.
519+
index_select: None,
520+
// The new index field is the previous probe field.
521+
index_col: probe_column,
522+
// Because we have swapped the original index and probe sides of the join,
523+
// the new index join needs to return rows from the index side instead of the probe side.
524+
return_index_rows: true,
525+
}
526+
}
527+
Table::DbTable(_) => self,
528+
}
529+
}
530+
}
531+
425532
#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord)]
426533
pub struct JoinExpr {
427534
pub rhs: QueryExpr,
@@ -1264,100 +1371,7 @@ impl QueryExpr {
12641371
q
12651372
}
12661373

1267-
// Is this an incremental evaluation of an index join {L+ join R}
1268-
fn is_incremental_index_join(&self) -> bool {
1269-
if self.query.len() != 1 {
1270-
return false;
1271-
}
1272-
// Is this in index join?
1273-
let Query::IndexJoin(IndexJoin {
1274-
probe_side:
1275-
QueryExpr {
1276-
source: SourceExpr::DbTable(rhs_table),
1277-
query: selections,
1278-
},
1279-
probe_field,
1280-
index_select: None,
1281-
return_index_rows: true,
1282-
..
1283-
}) = &self.query[0]
1284-
else {
1285-
return false;
1286-
};
1287-
// Is this an incremental evaluation of updates to the left hand table?
1288-
let SourceExpr::MemTable(_) = self.source else {
1289-
return false;
1290-
};
1291-
// Does the right hand table have an index on the join field?
1292-
if !rhs_table.head.has_constraint(probe_field, Constraints::indexed()) {
1293-
return false;
1294-
};
1295-
// The original probe side must consist of an optional index scan,
1296-
// followed by an arbitrary number of selections.
1297-
selections
1298-
.iter()
1299-
.all(|op| matches!(op, Query::Select(_)) || matches!(op, Query::IndexScan(_)))
1300-
}
1301-
1302-
// Assuming this is an incremental evaluation of an index join {L+ join R},
1303-
// swap the index and probe sides to avoid scanning all of R.
1304-
fn optimize_incremental_index_join(mut self) -> Option<IndexJoin> {
1305-
// This is an index join.
1306-
let Some(Query::IndexJoin(IndexJoin {
1307-
probe_side:
1308-
QueryExpr {
1309-
source: SourceExpr::DbTable(rhs_table),
1310-
query: selections,
1311-
},
1312-
probe_field,
1313-
index_side,
1314-
index_col,
1315-
index_select: None,
1316-
return_index_rows: true,
1317-
})) = self.query.pop()
1318-
else {
1319-
return None;
1320-
};
1321-
// This is an incremental evaluation of updates to the left hand table.
1322-
let SourceExpr::MemTable(index_side_updates) = self.source else {
1323-
return None;
1324-
};
1325-
let index_column = index_side
1326-
.head()
1327-
.fields
1328-
.iter()
1329-
.find(|column| column.col_id == index_col)?;
1330-
let index_field = index_column.field.clone();
1331-
let probe_column = rhs_table.head.column(&probe_field)?.col_id;
1332-
// Merge all selections from the original probe side into a single predicate.
1333-
// This includes an index scan if present.
1334-
let predicate = selections.iter().cloned().fold(None, |acc, op| {
1335-
<Query as Into<Option<ColumnOp>>>::into(op).map(|op| {
1336-
if let Some(predicate) = acc {
1337-
ColumnOp::new(OpQuery::Logic(OpLogic::And), predicate, op)
1338-
} else {
1339-
op
1340-
}
1341-
})
1342-
});
1343-
Some(IndexJoin {
1344-
// The new probe side consists of the updated rows.
1345-
probe_side: index_side_updates.into(),
1346-
// The new probe field is the previous index field.
1347-
probe_field: index_field,
1348-
// The original probe table is now the table that is being probed.
1349-
index_side: rhs_table.into(),
1350-
// Any selections from the original probe side are pulled above the index lookup.
1351-
index_select: predicate,
1352-
// The new index field is the previous probe field.
1353-
index_col: probe_column,
1354-
// Because we have swapped the original index and probe sides of the join,
1355-
// the new index join needs to return rows from the probe side instead of the index side.
1356-
return_index_rows: false,
1357-
})
1358-
}
1359-
1360-
pub fn optimize(self, db: Option<Address>) -> Self {
1374+
pub fn optimize(mut self, db: Option<Address>) -> Self {
13611375
let mut q = Self {
13621376
source: self.source.clone(),
13631377
query: Vec::with_capacity(self.query.len()),
@@ -1369,12 +1383,11 @@ impl QueryExpr {
13691383
.flat_map(|x| x.into_iter())
13701384
.collect();
13711385

1372-
if self.is_incremental_index_join() {
1373-
// The above check guarantees that the optimization will succeed,
1374-
// and therefore it is safe to unwrap.
1375-
let index_join = self.optimize_incremental_index_join().unwrap();
1376-
q.query.push(Query::IndexJoin(index_join));
1377-
return q;
1386+
if self.query.len() == 1 && matches!(self.query[0], Query::IndexJoin(_)) {
1387+
if let Some(Query::IndexJoin(join)) = self.query.pop() {
1388+
q.query.push(Query::IndexJoin(join.reorder()));
1389+
return q;
1390+
}
13781391
}
13791392

13801393
for query in self.query {

0 commit comments

Comments
 (0)