Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions crates/core/src/host/instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use spacetimedb_sats::buffer::BufWriter;
use spacetimedb_sats::db::def::{IndexDef, IndexType};
use spacetimedb_sats::relation::{FieldExpr, FieldName};
use spacetimedb_sats::{ProductType, Typespace};
use spacetimedb_vm::expr::{Code, ColumnOp};
use spacetimedb_vm::expr::{Code, ColumnOp, SourceSet};

#[derive(Clone)]
pub struct InstanceEnv {
Expand Down Expand Up @@ -368,11 +368,14 @@ impl InstanceEnv {
filter,
)
.map_err(NodesError::DecodeFilter)?;
let q = spacetimedb_vm::dsl::query(&*schema).with_select(filter_to_column_op(&schema.table_name, filter));

let q =
spacetimedb_vm::dsl::query(schema.as_ref()).with_select(filter_to_column_op(&schema.table_name, filter));
//TODO: How pass the `caller` here?
let mut tx: TxMode = tx.into();
let p = &mut DbProgram::new(ctx, stdb, &mut tx, AuthCtx::for_current(self.dbic.identity));
let results = match spacetimedb_vm::eval::run_ast(p, q.into()) {
// SQL queries can never reference `MemTable`s, so pass in an empty `SourceSet`.
let results = match spacetimedb_vm::eval::run_ast(p, q.into(), SourceSet::default()) {
Code::Table(table) => table,
_ => unreachable!("query should always return a table"),
};
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,9 @@ impl<T: WasmModule> Module for WasmModuleHostActor<T> {
let auth = AuthCtx::new(self.database_instance_context.identity, caller_identity);
log::debug!("One-off query: {query}");
let ctx = &ExecutionContext::sql(db.address());
let compiled = db.with_read_only(ctx, |tx| {
sql::compiler::compile_sql(db, tx, &query)?
.into_iter()
let compiled: Vec<_> = db.with_read_only(ctx, |tx| {
let ast = sql::compiler::compile_sql(db, tx, &query)?;
ast.into_iter()
.map(|expr| {
if matches!(expr, CrudExpr::Query { .. }) {
Ok(expr)
Expand Down
18 changes: 10 additions & 8 deletions crates/core/src/sql/compiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ use std::sync::Arc;

use super::ast::TableSchemaView;

/// Compile the `SQL` expression into a `ast`
/// Compile the `SQL` expression into an `ast`
#[tracing::instrument(skip_all)]
pub fn compile_sql<T: TableSchemaView>(db: &RelationalDB, tx: &T, sql_text: &str) -> Result<Vec<CrudExpr>, DBError> {
tracing::trace!(sql = sql_text);
let ast = compile_to_ast(db, tx, sql_text)?;

// TODO(perf, bikeshedding): SmallVec?
let mut results = Vec::with_capacity(ast.len());

for sql in ast {
Expand Down Expand Up @@ -133,23 +134,25 @@ fn compile_select(table: From, project: Vec<Column>, selection: Option<Selection
});
}

let mut q = query(db_table_raw(
let source_expr = SourceExpr::DbTable(db_table_raw(
&table.root,
table.root.table_id,
table.root.table_type,
table.root.table_access,
));

let mut q = query(source_expr);

if let Some(ref joins) = table.join {
for join in joins {
match join {
Join::Inner { rhs, on } => {
let t = db_table(rhs, rhs.table_id);
let rhs_source_expr = SourceExpr::DbTable(db_table(rhs, rhs.table_id));
match on.op {
OpCmp::Eq => {}
x => unreachable!("Unsupported operator `{x}` for joins"),
}
q = q.with_join_inner(t, on.lhs.clone(), on.rhs.clone());
q = q.with_join_inner(rhs_source_expr, on.lhs.clone(), on.rhs.clone());
}
}
}
Expand Down Expand Up @@ -196,7 +199,7 @@ fn compile_insert(
columns: Vec<FieldName>,
values: Vec<Vec<FieldExpr>>,
) -> Result<CrudExpr, PlanError> {
let db_table = compile_columns(&table, columns);
let source_expr = SourceExpr::DbTable(compile_columns(&table, columns));

let mut rows = Vec::with_capacity(values.len());
for x in values {
Expand All @@ -215,7 +218,7 @@ fn compile_insert(
}

Ok(CrudExpr::Insert {
source: SourceExpr::DbTable(db_table),
source: source_expr,
rows,
})
}
Expand Down Expand Up @@ -299,7 +302,6 @@ mod tests {
use spacetimedb_primitives::{ColId, TableId};
use spacetimedb_sats::AlgebraicType;
use spacetimedb_vm::expr::{IndexJoin, IndexScan, JoinExpr, Query};
use spacetimedb_vm::relation::Table;

fn assert_index_scan(
op: Query,
Expand Down Expand Up @@ -939,7 +941,7 @@ mod tests {
table: ref probe_table,
field: ref probe_field,
},
index_side: Table::DbTable(DbTable {
index_side: SourceExpr::DbTable(DbTable {
table_id: index_table, ..
}),
index_col,
Expand Down
27 changes: 15 additions & 12 deletions crates/core/src/sql/execute.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use spacetimedb_lib::identity::AuthCtx;
use spacetimedb_lib::{ProductType, ProductValue};
use spacetimedb_vm::eval::run_ast;
use spacetimedb_vm::expr::{CodeResult, CrudExpr, Expr};
use spacetimedb_vm::relation::MemTable;
use tracing::info;

use super::compiler::compile_sql;
use crate::database_instance_context_controller::DatabaseInstanceContextController;
use crate::db::relational_db::{MutTx, RelationalDB, Tx};
use crate::error::{DBError, DatabaseError};
use crate::execution_context::ExecutionContext;
use crate::sql::compiler::compile_sql;
use crate::vm::{DbProgram, TxMode};
use spacetimedb_lib::identity::AuthCtx;
use spacetimedb_lib::{ProductType, ProductValue};
use spacetimedb_vm::eval::run_ast;
use spacetimedb_vm::expr::{CodeResult, CrudExpr, Expr, SourceSet};
use spacetimedb_vm::relation::MemTable;
use tracing::info;

pub struct StmtResult {
pub schema: ProductType,
Expand Down Expand Up @@ -59,13 +58,14 @@ pub fn execute_single_sql(
tx: &Tx,
ast: CrudExpr,
auth: AuthCtx,
sources: SourceSet,
) -> Result<Vec<MemTable>, DBError> {
let mut tx: TxMode = tx.into();
let p = &mut DbProgram::new(cx, db, &mut tx, auth);
let q = Expr::Crud(Box::new(ast));

let mut result = Vec::with_capacity(1);
collect_result(&mut result, run_ast(p, q).into())?;
collect_result(&mut result, run_ast(p, q, sources).into())?;
Ok(result)
}

Expand All @@ -75,6 +75,7 @@ pub fn execute_sql_mut_tx(
tx: &mut MutTx,
ast: Vec<CrudExpr>,
auth: AuthCtx,
sources: SourceSet,
) -> Result<Vec<MemTable>, DBError> {
let total = ast.len();
let mut tx: TxMode = tx.into();
Expand All @@ -83,7 +84,7 @@ pub fn execute_sql_mut_tx(
let q = Expr::Block(ast.into_iter().map(|x| Expr::Crud(Box::new(x))).collect());

let mut result = Vec::with_capacity(total);
collect_result(&mut result, run_ast(p, q).into())?;
collect_result(&mut result, run_ast(p, q, sources).into())?;
Ok(result)
}

Expand All @@ -100,13 +101,15 @@ pub fn execute_sql(db: &RelationalDB, ast: Vec<CrudExpr>, auth: AuthCtx) -> Resu
let mut tx: TxMode = mut_tx.into();
let q = Expr::Block(ast.into_iter().map(|x| Expr::Crud(Box::new(x))).collect());
let p = &mut DbProgram::new(&ctx, db, &mut tx, auth);
collect_result(&mut result, run_ast(p, q).into())
// SQL queries can never reference `MemTable`s, so pass an empty `SourceSet`.
collect_result(&mut result, run_ast(p, q, SourceSet::default()).into())
}),
true => db.with_read_only(&ctx, |tx| {
let mut tx = TxMode::Tx(tx);
let q = Expr::Block(ast.into_iter().map(|x| Expr::Crud(Box::new(x))).collect());
let p = &mut DbProgram::new(&ctx, db, &mut tx, auth);
collect_result(&mut result, run_ast(p, q).into())
// SQL queries can never reference `MemTable`s, so pass an empty `SourceSet`.
collect_result(&mut result, run_ast(p, q, SourceSet::default()).into())
}),
}?;

Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/subscription/module_subscription_actor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::sync::Arc;

use super::{
query::compile_read_only_query,
subscription::{ExecutionSet, Subscription},
Expand All @@ -20,6 +18,7 @@ use parking_lot::RwLock;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use spacetimedb_lib::identity::AuthCtx;
use spacetimedb_lib::Identity;
use std::sync::Arc;

type Subscriptions = Arc<RwLock<Vec<Subscription>>>;
#[derive(Debug)]
Expand Down
72 changes: 41 additions & 31 deletions crates/core/src/subscription/query.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
use std::sync::Arc;
use std::time::Instant;

use crate::db::db_metrics::{DB_METRICS, MAX_QUERY_COMPILE_TIME};
use crate::db::relational_db::{RelationalDB, Tx};
use crate::error::{DBError, SubscriptionError};
Expand All @@ -16,9 +13,10 @@ use spacetimedb_lib::Address;
use spacetimedb_sats::db::auth::StAccess;
use spacetimedb_sats::relation::{Column, FieldName, Header};
use spacetimedb_sats::AlgebraicType;
use spacetimedb_vm::expr;
use spacetimedb_vm::expr::{Crud, CrudExpr, DbType, QueryExpr};
use spacetimedb_vm::expr::{self, Crud, CrudExpr, DbType, QueryExpr, SourceSet};
use spacetimedb_vm::relation::MemTable;
use std::sync::Arc;
use std::time::Instant;

use super::subscription::get_all;

Expand Down Expand Up @@ -67,9 +65,12 @@ pub fn to_mem_table_with_op_type(head: Arc<Header>, table_access: StAccess, data
///
/// To be able to reify the `op_type` of the individual operations in the update,
/// each virtual row is extended with a column [`OP_TYPE_FIELD_NAME`].
pub fn to_mem_table(mut of: QueryExpr, data: &DatabaseTableUpdate) -> QueryExpr {
of.source = to_mem_table_with_op_type(of.source.head().clone(), of.source.table_access(), data).into();
of
pub fn to_mem_table(mut of: QueryExpr, data: &DatabaseTableUpdate) -> (QueryExpr, SourceSet) {
let mem_table = to_mem_table_with_op_type(of.source.head().clone(), of.source.table_access(), data);
let mut sources = SourceSet::default();
let source_expr = sources.add_mem_table(mem_table);
of.source = source_expr;
(of, sources)
}

/// Runs a query that evaluates if the changes made should be reported to the [ModuleSubscriptionManager]
Expand All @@ -80,26 +81,21 @@ pub(crate) fn run_query(
tx: &Tx,
query: &QueryExpr,
auth: AuthCtx,
sources: SourceSet,
) -> Result<Vec<MemTable>, DBError> {
execute_single_sql(cx, db, tx, CrudExpr::Query(query.clone()), auth)
execute_single_sql(cx, db, tx, CrudExpr::Query(query.clone()), auth, sources)
}

// TODO: It's semantically wrong to `SUBSCRIBE_TO_ALL_QUERY`
// as it can only return back the changes valid for the tables in scope *right now*
// instead of **continuously updating** the db changes
// with system table modifications (add/remove tables, indexes, ...).
/// Compile from `SQL` into a [`Query`], rejecting empty queries and queries that attempt to modify the data in any way.
///
/// NOTE: When the `input` query is equal to [`SUBSCRIBE_TO_ALL_QUERY`],
/// **compilation is bypassed** and the equivalent of the following is done:
//
/// Variant of [`compile_read_only_query`] which appends `SourceExpr`s into a given `SourceBuilder`,
/// rather than returning a new `SourceSet`.
///
///```rust,ignore
/// for t in db.user_tables {
/// query.push(format!("SELECT * FROM {t}"));
/// }
/// ```
///
/// WARNING: [`SUBSCRIBE_TO_ALL_QUERY`] is only valid for repeated calls as long there is not change on database schema, and the clients must `unsubscribe` before modifying it.
/// This is necessary when merging multiple SQL queries into a single query set,
/// as in [`crate::subscription::module_subscription_actor::ModuleSubscriptions::add_subscriber`].
#[tracing::instrument(skip(relational_db, auth, tx))]
pub fn compile_read_only_query(
relational_db: &RelationalDB,
Expand Down Expand Up @@ -173,7 +169,7 @@ fn record_query_compilation_metrics(workload: WorkloadType, db: &Address, query:
}

/// The kind of [`QueryExpr`] currently supported for incremental evaluation.
#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd, Hash)]
pub enum Supported {
/// A scan or [`QueryExpr::Select`] of a single table.
Scan,
Expand Down Expand Up @@ -218,7 +214,7 @@ mod tests {
use spacetimedb_sats::db::def::*;
use spacetimedb_sats::relation::FieldName;
use spacetimedb_sats::{product, ProductType, ProductValue};
use spacetimedb_vm::dsl::{db_table, mem_table, scalar};
use spacetimedb_vm::dsl::{mem_table, scalar};
use spacetimedb_vm::operator::OpCmp;

fn insert_op(table_id: TableId, table_name: &str, row: ProductValue) -> DatabaseTableUpdate {
Expand Down Expand Up @@ -263,7 +259,8 @@ mod tests {
};

let schema = db.schema_for_table_mut(tx, table_id).unwrap().into_owned();
let q = QueryExpr::new(db_table(&schema, table_id));

let q = QueryExpr::new(&schema);

Ok((schema, table, data, q))
}
Expand Down Expand Up @@ -323,8 +320,15 @@ mod tests {
q: &QueryExpr,
data: &DatabaseTableUpdate,
) -> ResultTest<()> {
let q = to_mem_table(q.clone(), data);
let result = run_query(&ExecutionContext::default(), db, tx, &q, AuthCtx::for_testing())?;
let (q, sources) = to_mem_table(q.clone(), data);
let result = run_query(
&ExecutionContext::default(),
db,
tx,
&q,
AuthCtx::for_testing(),
sources,
)?;

assert_eq!(
Some(table.as_without_table_name()),
Expand Down Expand Up @@ -386,6 +390,10 @@ mod tests {
Ok(())
}

fn singleton_execution_set(expr: QueryExpr) -> ResultTest<ExecutionSet> {
Ok(ExecutionSet::from_iter([SupportedQuery::try_from(expr)?]))
}

#[test]
fn test_eval_incr_for_index_scan() -> ResultTest<()> {
let (db, _tmp) = make_test_db()?;
Expand Down Expand Up @@ -423,7 +431,7 @@ mod tests {
panic!("unexpected query {:#?}", exp[0]);
};

let query: ExecutionSet = query.try_into()?;
let query: ExecutionSet = singleton_execution_set(query)?;

let result = query.eval_incr(&db, &tx, &update, AuthCtx::for_testing())?;

Expand Down Expand Up @@ -487,7 +495,7 @@ mod tests {
panic!("unexpected query {:#?}", exp[0]);
};

let query: ExecutionSet = query.try_into()?;
let query: ExecutionSet = singleton_execution_set(query)?;

db.release_tx(&ExecutionContext::default(), tx);

Expand Down Expand Up @@ -758,13 +766,13 @@ mod tests {
check_query(&db, &table, &tx, &q, &data)?;

//SELECT * FROM inventory WHERE inventory_id = 1
let q_id = QueryExpr::new(db_table(&schema, schema.table_id)).with_select_cmp(
let q_id = QueryExpr::new(&schema).with_select_cmp(
OpCmp::Eq,
FieldName::named("_inventory", "inventory_id"),
scalar(1u64),
);

let s = ExecutionSet::from_iter([q_id.try_into()?]);
let s = singleton_execution_set(q_id)?;

let row2 = TableOp::insert(row.clone());

Expand All @@ -780,16 +788,17 @@ mod tests {

check_query_incr(&db, &tx, &s, &update, 1, &[row])?;

let q = QueryExpr::new(db_table(&schema, schema.table_id));
let q = QueryExpr::new(&schema);

let q = to_mem_table(q, &data);
let (q, sources) = to_mem_table(q, &data);
//Try access the private table
match run_query(
&ExecutionContext::default(),
&db,
&tx,
&q,
AuthCtx::new(Identity::__dummy(), Identity::from_byte_array([1u8; 32])),
sources,
) {
Ok(_) => {
panic!("it allows to execute against private table")
Expand Down Expand Up @@ -863,6 +872,7 @@ mod tests {
&tx,
q.as_expr(),
AuthCtx::for_testing(),
SourceSet::default(),
)?;
assert_eq!(result.len(), 1, "Join query did not return any rows");
}
Expand Down
Loading