diff --git a/crates/core/src/db/commit_log.rs b/crates/core/src/db/commit_log.rs index 09f621cab4d..1db7d01ee00 100644 --- a/crates/core/src/db/commit_log.rs +++ b/crates/core/src/db/commit_log.rs @@ -367,7 +367,7 @@ impl CommitLogMut { let workload = &ctx.workload(); let db = &ctx.database(); - let reducer_or_query = &ctx.reducer_or_query(); + let reducer = &ctx.reducer_name(); for record in &tx_data.records { let table_id: u32 = record.table_id.into(); @@ -378,7 +378,7 @@ impl CommitLogMut { // Increment rows inserted metric DB_METRICS .rdb_num_rows_inserted - .with_label_values(workload, db, reducer_or_query, &table_id, table_name) + .with_label_values(workload, db, reducer, &table_id, table_name) .inc(); // Increment table rows gauge DB_METRICS @@ -391,7 +391,7 @@ impl CommitLogMut { // Increment rows deleted metric DB_METRICS .rdb_num_rows_deleted - .with_label_values(workload, db, reducer_or_query, &table_id, table_name) + .with_label_values(workload, db, reducer, &table_id, table_name) .inc(); // Decrement table rows gauge DB_METRICS diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mod.rs b/crates/core/src/db/datastore/locking_tx_datastore/mod.rs index e5e30ab90b4..ac6f2e41c48 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mod.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mod.rs @@ -339,7 +339,7 @@ impl TxId { fn release(self, ctx: &ExecutionContext) { let workload = &ctx.workload(); let db = &ctx.database(); - let reducer = ctx.reducer_name().unwrap_or_default(); + let reducer = ctx.reducer_name(); let elapsed_time = self.timer.elapsed(); let cpu_time = elapsed_time - self.lock_wait_time; DB_METRICS @@ -1978,7 +1978,7 @@ impl Drop for Iter<'_> { .with_label_values( &self.ctx.workload(), &self.ctx.database(), - self.ctx.reducer_or_query(), + self.ctx.reducer_name(), &self.table_id.into(), self.table_name, ) @@ -2118,7 +2118,7 @@ impl Drop for IndexSeekIterMutTxId<'_> { .with_label_values( &self.ctx.workload(), &self.ctx.database(), - self.ctx.reducer_or_query(), + self.ctx.reducer_name(), &self.table_id.0, table_name, ) @@ -2130,7 +2130,7 @@ impl Drop for IndexSeekIterMutTxId<'_> { .with_label_values( &self.ctx.workload(), &self.ctx.database(), - self.ctx.reducer_or_query(), + self.ctx.reducer_name(), &self.table_id.0, table_name, ) @@ -2142,7 +2142,7 @@ impl Drop for IndexSeekIterMutTxId<'_> { .with_label_values( &self.ctx.workload(), &self.ctx.database(), - self.ctx.reducer_or_query(), + self.ctx.reducer_name(), &self.table_id.0, table_name, ) @@ -2201,7 +2201,7 @@ impl Drop for CommittedIndexIter<'_> { .with_label_values( &self.ctx.workload(), &self.ctx.database(), - self.ctx.reducer_or_query(), + self.ctx.reducer_name(), &self.table_id.0, table_name, ) @@ -2213,7 +2213,7 @@ impl Drop for CommittedIndexIter<'_> { .with_label_values( &self.ctx.workload(), &self.ctx.database(), - self.ctx.reducer_or_query(), + self.ctx.reducer_name(), &self.table_id.0, table_name, ) @@ -2225,7 +2225,7 @@ impl Drop for CommittedIndexIter<'_> { .with_label_values( &self.ctx.workload(), &self.ctx.database(), - self.ctx.reducer_or_query(), + self.ctx.reducer_name(), &self.table_id.0, table_name, ) @@ -2401,7 +2401,7 @@ impl traits::MutTx for Locking { fn rollback_mut_tx(&self, ctx: &ExecutionContext, tx: Self::MutTx) { let workload = &ctx.workload(); let db = &ctx.database(); - let reducer = ctx.reducer_name().unwrap_or_default(); + let reducer = ctx.reducer_name(); let elapsed_time = tx.timer.elapsed(); let cpu_time = elapsed_time - tx.lock_wait_time; DB_METRICS @@ -2422,7 +2422,7 @@ impl traits::MutTx for Locking { fn commit_mut_tx(&self, ctx: &ExecutionContext, tx: Self::MutTx) -> super::Result> { let workload = &ctx.workload(); let db = &ctx.database(); - let reducer = ctx.reducer_name().unwrap_or_default(); + let reducer = ctx.reducer_name(); let elapsed_time = tx.timer.elapsed(); let cpu_time = elapsed_time - tx.lock_wait_time; diff --git a/crates/core/src/db/db_metrics/mod.rs b/crates/core/src/db/db_metrics/mod.rs index 192f768c72e..bc69bc6d3e2 100644 --- a/crates/core/src/db/db_metrics/mod.rs +++ b/crates/core/src/db/db_metrics/mod.rs @@ -83,7 +83,7 @@ metrics_group!( #[name = spacetime_query_cpu_time_sec] #[help = "The time spent executing a query (in seconds)"] - #[labels(txn_type: WorkloadType, db: Address, query: str)] + #[labels(txn_type: WorkloadType, db: Address)] #[buckets( 1e-6, 5e-6, 1e-5, 5e-5, 1e-4, 5e-4, 1e-3, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0 )] @@ -91,12 +91,12 @@ metrics_group!( #[name = spacetime_query_cpu_time_sec_max] #[help = "The cpu time of the longest running query (in seconds)"] - #[labels(txn_type: WorkloadType, db: Address, query: str)] + #[labels(txn_type: WorkloadType, db: Address)] pub rdb_query_cpu_time_sec_max: GaugeVec, #[name = spacetime_query_compile_time_sec] #[help = "The time spent compiling a query (in seconds)"] - #[labels(txn_type: WorkloadType, db: Address, query: str)] + #[labels(txn_type: WorkloadType, db: Address)] #[buckets( 1e-6, 5e-6, 1e-5, 5e-5, 1e-4, 5e-4, 1e-3, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0 )] @@ -104,7 +104,7 @@ metrics_group!( #[name = spacetime_query_compile_time_sec_max] #[help = "The maximum query compilation time (in seconds)"] - #[labels(txn_type: WorkloadType, db: Address, query: str)] + #[labels(txn_type: WorkloadType, db: Address)] pub rdb_query_compile_time_sec_max: GaugeVec, #[name = spacetime_wasm_abi_call_duration_sec] @@ -132,11 +132,12 @@ metrics_group!( } ); -type Triple = (Address, WorkloadType, String); +type ReducerLabel = (Address, WorkloadType, String); +type AddressLabel = (Address, WorkloadType); -pub static MAX_TX_CPU_TIME: Lazy>> = Lazy::new(|| Mutex::new(HashMap::new())); -pub static MAX_QUERY_CPU_TIME: Lazy>> = Lazy::new(|| Mutex::new(HashMap::new())); -pub static MAX_QUERY_COMPILE_TIME: Lazy>> = Lazy::new(|| Mutex::new(HashMap::new())); +pub static MAX_TX_CPU_TIME: Lazy>> = Lazy::new(|| Mutex::new(HashMap::new())); +pub static MAX_QUERY_CPU_TIME: Lazy>> = Lazy::new(|| Mutex::new(HashMap::new())); +pub static MAX_QUERY_COMPILE_TIME: Lazy>> = Lazy::new(|| Mutex::new(HashMap::new())); pub static DB_METRICS: Lazy = Lazy::new(DbMetrics::new); pub fn reset_counters() { diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs index c4675b5835e..a6151873482 100644 --- a/crates/core/src/error.rs +++ b/crates/core/src/error.rs @@ -8,7 +8,6 @@ use thiserror::Error; use crate::client::ClientActorId; use crate::db::datastore::system_tables::SystemTable; -use crate::sql::query_debug_info::QueryDebugInfo; use spacetimedb_lib::buffer::DecodeError; use spacetimedb_lib::{PrimaryKey, ProductValue}; use spacetimedb_primitives::*; @@ -95,7 +94,7 @@ pub enum SubscriptionError { #[error("Queries with side effects not allowed: {0:?}")] SideEffect(Crud), #[error("Unsupported query on subscription: {0:?}")] - Unsupported(QueryDebugInfo), + Unsupported(String), } #[derive(Error, Debug)] diff --git a/crates/core/src/execution_context.rs b/crates/core/src/execution_context.rs index fb411fe3d75..fbd53937368 100644 --- a/crates/core/src/execution_context.rs +++ b/crates/core/src/execution_context.rs @@ -3,8 +3,6 @@ use spacetimedb_lib::Address; use spacetimedb_metrics::impl_prometheusvalue_string; use spacetimedb_metrics::typed_prometheus::AsPrometheusLabel; -use crate::sql::query_debug_info::QueryDebugInfo; - /// Represents the context under which a database runtime method is executed. /// In particular it provides details about the currently executing txn to runtime operations. /// More generally it acts as a container for information that database operations may require to function correctly. @@ -13,15 +11,8 @@ pub struct ExecutionContext<'a> { /// The database on which a transaction is being executed. database: Address, /// The reducer from which the current transaction originated. - /// Note: this will never be set at the same time as `query`. reducer: Option<&'a str>, - /// The SQL query being executed, if any. - /// Note: this will never be set at the same time as `reducer`. - /// It is also NOT guaranteed to be set, even if workload == Sql. - /// This is because some transactions tagged "SQL" don't exactly correspond - /// to any particular query. - query_debug_info: Option<&'a QueryDebugInfo>, - // The type of workload that is being executed. + /// The type of workload that is being executed. workload: WorkloadType, } @@ -52,37 +43,33 @@ impl<'a> ExecutionContext<'a> { Self { database, reducer: Some(name), - query_debug_info: None, workload: WorkloadType::Reducer, } } /// Returns an [ExecutionContext] for a one-off sql query. - pub fn sql(database: Address, query_debug_info: Option<&'a QueryDebugInfo>) -> Self { + pub fn sql(database: Address) -> Self { Self { database, reducer: None, - query_debug_info, workload: WorkloadType::Sql, } } /// Returns an [ExecutionContext] for an initial subscribe call. - pub fn subscribe(database: Address, query_debug_info: Option<&'a QueryDebugInfo>) -> Self { + pub fn subscribe(database: Address) -> Self { Self { database, reducer: None, - query_debug_info, workload: WorkloadType::Subscribe, } } /// Returns an [ExecutionContext] for a subscription update. - pub fn incremental_update(database: Address, query_debug_info: Option<&'a QueryDebugInfo>) -> Self { + pub fn incremental_update(database: Address) -> Self { Self { database, reducer: None, - query_debug_info, workload: WorkloadType::Update, } } @@ -92,7 +79,6 @@ impl<'a> ExecutionContext<'a> { Self { database, reducer: None, - query_debug_info: None, workload: WorkloadType::Internal, } } @@ -103,26 +89,10 @@ impl<'a> ExecutionContext<'a> { self.database } - /// Returns the name of the reducer that is being executed. - /// Returns [None] if this is not a reducer context. - #[inline] - pub fn reducer_name(&self) -> Option<&str> { - self.reducer - } - - /// Returns the debug info for the query being executed. - /// Returns [None] if this is not a sql context. - #[inline] - pub fn query_debug_info(&self) -> Option<&QueryDebugInfo> { - self.query_debug_info - } - /// If this is a reducer context, returns the name of the reducer. - /// If this is a query context, returns the query string. #[inline] - pub fn reducer_or_query(&self) -> &str { - self.reducer - .unwrap_or_else(|| self.query_debug_info.map(|info| info.source()).unwrap_or_default()) + pub fn reducer_name(&self) -> &str { + self.reducer.unwrap_or_default() } /// Returns the type of workload that is being executed. diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 34461289f46..35e974cf379 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -26,7 +26,6 @@ use crate::host::{ }; use crate::identity::Identity; use crate::sql; -use crate::sql::query_debug_info::QueryDebugInfo; use crate::subscription::module_subscription_actor::ModuleSubscriptionManager; use crate::util::{const_unwrap, ResultInspectExt}; use crate::worker_metrics::WORKER_METRICS; @@ -269,24 +268,21 @@ impl Module for WasmModuleHostActor { let auth = AuthCtx::new(self.database_instance_context.identity, caller_identity); // TODO(jgilles): make this a read-only TX when those get added - db.with_read_only( - &ExecutionContext::sql(db.address(), Some(&QueryDebugInfo::from_source(&query))), - |tx| { - log::debug!("One-off query: {query}"); - - let compiled = sql::compiler::compile_sql(db, tx, &query)? - .into_iter() - .map(|expr| { - if matches!(expr, CrudExpr::Query { .. }) { - Ok(expr) - } else { - Err(anyhow!("One-off queries are not allowed to modify the database")) - } - }) - .collect::>()?; - sql::execute::execute_sql(db, tx, compiled, Some(&QueryDebugInfo::from_source(&query)), auth) - }, - ) + db.with_read_only(&ExecutionContext::sql(db.address()), |tx| { + log::debug!("One-off query: {query}"); + + let compiled = sql::compiler::compile_sql(db, tx, &query)? + .into_iter() + .map(|expr| { + if matches!(expr, CrudExpr::Query { .. }) { + Ok(expr) + } else { + Err(anyhow!("One-off queries are not allowed to modify the database")) + } + }) + .collect::>()?; + sql::execute::execute_sql(db, tx, compiled, auth) + }) } fn clear_table(&self, table_name: String) -> Result<(), anyhow::Error> { diff --git a/crates/core/src/sql/execute.rs b/crates/core/src/sql/execute.rs index 6a94922772c..bbc0034247c 100644 --- a/crates/core/src/sql/execute.rs +++ b/crates/core/src/sql/execute.rs @@ -12,8 +12,6 @@ use crate::execution_context::ExecutionContext; use crate::sql::compiler::compile_sql; use crate::vm::DbProgram; -use super::query_debug_info::QueryDebugInfo; - pub struct StmtResult { pub schema: ProductType, pub rows: Vec, @@ -33,8 +31,7 @@ pub fn execute( info!(sql = sql_text); if let Some((database_instance_context, _)) = db_inst_ctx_controller.get(database_instance_id) { let db = &database_instance_context.relational_db; - let info = QueryDebugInfo::from_source(&sql_text); - let ctx = ExecutionContext::sql(db.address(), Some(&info)); + let ctx = ExecutionContext::sql(db.address()); db.with_auto_commit(&ctx, |tx| { run(&database_instance_context.relational_db, tx, &sql_text, auth) }) @@ -81,11 +78,10 @@ pub fn execute_sql( db: &RelationalDB, tx: &mut MutTx, ast: Vec, - query_debug_info: Option<&QueryDebugInfo>, auth: AuthCtx, ) -> Result, DBError> { let total = ast.len(); - let ctx = ExecutionContext::sql(db.address(), query_debug_info); + let ctx = ExecutionContext::sql(db.address()); let p = &mut DbProgram::new(&ctx, db, tx, auth); let q = Expr::Block(ast.into_iter().map(|x| Expr::Crud(Box::new(x))).collect()); @@ -98,7 +94,7 @@ pub fn execute_sql( #[tracing::instrument(skip_all)] pub fn run(db: &RelationalDB, tx: &mut MutTx, sql_text: &str, auth: AuthCtx) -> Result, DBError> { let ast = compile_sql(db, tx, sql_text)?; - execute_sql(db, tx, ast, Some(&QueryDebugInfo::from_source(sql_text)), auth) + execute_sql(db, tx, ast, auth) } #[cfg(test)] diff --git a/crates/core/src/sql/mod.rs b/crates/core/src/sql/mod.rs index bfce32c731c..4ba53d68433 100644 --- a/crates/core/src/sql/mod.rs +++ b/crates/core/src/sql/mod.rs @@ -1,4 +1,3 @@ pub mod ast; pub mod compiler; pub mod execute; -pub mod query_debug_info; diff --git a/crates/core/src/sql/query_debug_info.rs b/crates/core/src/sql/query_debug_info.rs deleted file mode 100644 index eaece875474..00000000000 --- a/crates/core/src/sql/query_debug_info.rs +++ /dev/null @@ -1,22 +0,0 @@ -use std::sync::Arc; - -/// Used to store the source of a SQL query in a way that can be cheaply cloned, -/// without proliferating lifetimes everywhere. -/// -/// TODO: if CrudExpr ever gets refactored, this should probably be attached to those. -/// That would be a large refactoring though. It would be nice if we could get -/// more precise spans from sqlparser. We could stick all sorts of other things in here too. -#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd)] -pub struct QueryDebugInfo(Arc); - -impl QueryDebugInfo { - /// Create a new [QueryDebugInfo] from the given source text. - pub fn from_source>(source: T) -> Self { - Self(source.as_ref().into()) - } - - /// Get the source text of the query, if available. - pub fn source(&self) -> &str { - &self.0 - } -} diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 1083fdc0715..e76ee087791 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -170,7 +170,7 @@ impl ModuleSubscriptionActor { // Note: the missing QueryDebugInfo here is only used for finishing the transaction; // all of the relevant queries already executed, with debug info, in _add_subscription - let ctx = ExecutionContext::subscribe(self.relational_db.address(), None); + let ctx = ExecutionContext::subscribe(self.relational_db.address()); self.relational_db.finish_tx(&ctx, tx, result) } @@ -218,7 +218,7 @@ impl ModuleSubscriptionActor { //Split logic to properly handle `Error` + `Tx` let mut tx = self.relational_db.begin_tx(); let result = self._broadcast_commit_event(event, &mut tx).await; - let ctx = ExecutionContext::incremental_update(self.relational_db.address(), None); + let ctx = ExecutionContext::incremental_update(self.relational_db.address()); self.relational_db.finish_tx(&ctx, tx, result) } } diff --git a/crates/core/src/subscription/query.rs b/crates/core/src/subscription/query.rs index 0c133bc9569..b07301218d3 100644 --- a/crates/core/src/subscription/query.rs +++ b/crates/core/src/subscription/query.rs @@ -8,7 +8,6 @@ use crate::execution_context::{ExecutionContext, WorkloadType}; use crate::host::module_host::DatabaseTableUpdate; use crate::sql::compiler::compile_sql; use crate::sql::execute::execute_single_sql; -use crate::sql::query_debug_info::QueryDebugInfo; use crate::subscription::subscription::{QuerySet, SupportedQuery}; use once_cell::sync::Lazy; use regex::Regex; @@ -134,12 +133,10 @@ pub fn compile_read_only_query( } } - let info = QueryDebugInfo::from_source(input); - if !queries.is_empty() { Ok(queries .into_iter() - .map(|query| SupportedQuery::new(query, info.clone())) + .map(|query| SupportedQuery::new(query, input.to_string())) .collect::>()?) } else { Err(SubscriptionError::Empty.into()) @@ -153,13 +150,13 @@ fn record_query_compilation_metrics(workload: WorkloadType, db: &Address, query: DB_METRICS .rdb_query_compile_time_sec - .with_label_values(&workload, db, query) + .with_label_values(&workload, db) .observe(compile_duration); let max_compile_duration = *MAX_QUERY_COMPILE_TIME .lock() .unwrap() - .entry((*db, workload, query.to_owned())) + .entry((*db, workload)) .and_modify(|max| { if compile_duration > *max { *max = compile_duration; @@ -169,7 +166,7 @@ fn record_query_compilation_metrics(workload: WorkloadType, db: &Address, query: DB_METRICS .rdb_query_compile_time_sec_max - .with_label_values(&workload, db, query) + .with_label_values(&workload, db) .set(max_compile_duration); } diff --git a/crates/core/src/subscription/subscription.rs b/crates/core/src/subscription/subscription.rs index 8d36b733b11..e33416302d7 100644 --- a/crates/core/src/subscription/subscription.rs +++ b/crates/core/src/subscription/subscription.rs @@ -33,7 +33,6 @@ use crate::db::datastore::locking_tx_datastore::MutTxId; use crate::db::db_metrics::{DB_METRICS, MAX_QUERY_CPU_TIME}; use crate::error::{DBError, SubscriptionError}; use crate::execution_context::{ExecutionContext, WorkloadType}; -use crate::sql::query_debug_info::QueryDebugInfo; use crate::subscription::query::{run_query, to_mem_table_with_op_type, OP_TYPE_FIELD_NAME}; use crate::{ client::{ClientActorId, ClientConnectionSender}, @@ -87,13 +86,12 @@ impl Subscription { pub struct SupportedQuery { kind: query::Supported, expr: QueryExpr, - info: QueryDebugInfo, } impl SupportedQuery { - pub fn new(expr: QueryExpr, info: QueryDebugInfo) -> Result { - let kind = query::classify(&expr).ok_or_else(|| SubscriptionError::Unsupported(info.clone()))?; - Ok(Self { kind, expr, info }) + pub fn new(expr: QueryExpr, text: String) -> Result { + let kind = query::classify(&expr).ok_or(SubscriptionError::Unsupported(text))?; + Ok(Self { kind, expr }) } pub fn kind(&self) -> query::Supported { @@ -111,11 +109,7 @@ impl TryFrom for SupportedQuery { fn try_from(expr: QueryExpr) -> Result { let kind = query::classify(&expr).context("Unsupported query expression")?; - Ok(Self { - kind, - expr, - info: QueryDebugInfo::from_source(""), - }) + Ok(Self { kind, expr }) } } @@ -188,19 +182,15 @@ fn evaluator_for_secondary_updates( db: &RelationalDB, auth: AuthCtx, inserts: bool, -) -> impl Fn(&mut MutTxId, &QueryExpr, &QueryDebugInfo) -> Result, DBError> + '_ { - move |tx, query, info| { +) -> impl Fn(&mut MutTxId, &QueryExpr) -> Result, DBError> + '_ { + move |tx, query| { let mut out = HashMap::new(); // If we are evaluating inserts, the op type should be 1. // Otherwise we are evaluating deletes, and the op type should be 0. let op_type = if inserts { 1 } else { 0 }; - for MemTable { data, .. } in run_query( - &ExecutionContext::incremental_update(db.address(), Some(info)), - db, - tx, - query, - auth, - )? { + for MemTable { data, .. } in + run_query(&ExecutionContext::incremental_update(db.address()), db, tx, query, auth)? + { for row in data { let row_pk = pk_for_row(&row); let row = row.data; @@ -218,16 +208,12 @@ fn evaluator_for_secondary_updates( fn evaluator_for_primary_updates( db: &RelationalDB, auth: AuthCtx, -) -> impl Fn(&mut MutTxId, &QueryExpr, &QueryDebugInfo) -> Result, DBError> + '_ { - move |tx, query, info| { +) -> impl Fn(&mut MutTxId, &QueryExpr) -> Result, DBError> + '_ { + move |tx, query| { let mut out = HashMap::new(); - for MemTable { data, head, .. } in run_query( - &ExecutionContext::incremental_update(db.address(), Some(info)), - db, - tx, - query, - auth, - )? { + for MemTable { data, head, .. } in + run_query(&ExecutionContext::incremental_update(db.address()), db, tx, query, auth)? + { // Remove the special __op_type field before computing each row's primary key. let pos_op_type = head.find_pos_by_name(OP_TYPE_FIELD_NAME).unwrap_or_else(|| { panic!( @@ -270,7 +256,6 @@ impl QuerySet { .map(|src| SupportedQuery { kind: query::Supported::Scan, expr: QueryExpr::new(src), - info: QueryDebugInfo::from_source(format!("SELECT * FROM {}", src.table_name)), }) .collect(); @@ -295,7 +280,7 @@ impl QuerySet { let mut seen = HashSet::new(); let eval = evaluator_for_primary_updates(db, auth); - for SupportedQuery { kind, expr, info } in self { + for SupportedQuery { kind, expr, .. } in self { use query::Supported::*; let start = Instant::now(); match kind { @@ -314,7 +299,7 @@ impl QuerySet { let plan = query::to_mem_table(expr.clone(), table); // Evaluate the new plan and capture the new row operations - for op in eval(tx, &plan, info)? + for op in eval(tx, &plan)? .into_iter() .filter_map(|(row_pk, op)| seen.insert((table.table_id, row_pk)).then(|| op.into())) { @@ -323,7 +308,7 @@ impl QuerySet { } } Semijoin => { - if let Some(plan) = IncrementalJoin::new(expr, info, database_update.tables.iter())? { + if let Some(plan) = IncrementalJoin::new(expr, database_update.tables.iter())? { let table = plan.left_table(); let table_id = table.table_id; let header = &table.head; @@ -343,7 +328,7 @@ impl QuerySet { } } } - record_query_duration_metrics(WorkloadType::Update, &db.address(), info.source(), start); + record_query_duration_metrics(WorkloadType::Update, &db.address(), start); } for (table_id, (table_name, ops)) in table_ops.into_iter().filter(|(_, (_, ops))| !ops.is_empty()) { output.tables.push(DatabaseTableUpdate { @@ -368,20 +353,14 @@ impl QuerySet { let mut table_ops = HashMap::new(); let mut seen = HashSet::new(); - for SupportedQuery { expr, info, .. } in self { + for SupportedQuery { expr, .. } in self { if let Some(t) = expr.source.get_db_table() { let start = Instant::now(); // Get the TableOps for this table let (_, table_row_operations) = table_ops .entry(t.table_id) .or_insert_with(|| (t.head.table_name.clone(), vec![])); - for table in run_query( - &ExecutionContext::subscribe(db.address(), Some(info)), - db, - tx, - expr, - auth, - )? { + for table in run_query(&ExecutionContext::subscribe(db.address()), db, tx, expr, auth)? { for row in table.data { let row_pk = pk_for_row(&row); @@ -400,7 +379,7 @@ impl QuerySet { }); } } - record_query_duration_metrics(WorkloadType::Subscribe, &db.address(), info.source(), start); + record_query_duration_metrics(WorkloadType::Subscribe, &db.address(), start); } } for (table_id, (table_name, ops)) in table_ops.into_iter().filter(|(_, (_, ops))| !ops.is_empty()) { @@ -414,18 +393,18 @@ impl QuerySet { } } -fn record_query_duration_metrics(workload: WorkloadType, db: &Address, query: &str, start: Instant) { +fn record_query_duration_metrics(workload: WorkloadType, db: &Address, start: Instant) { let query_duration = start.elapsed().as_secs_f64(); DB_METRICS .rdb_query_cpu_time_sec - .with_label_values(&workload, db, query) + .with_label_values(&workload, db) .observe(query_duration); let max_query_duration = *MAX_QUERY_CPU_TIME .lock() .unwrap() - .entry((*db, workload, query.to_owned())) + .entry((*db, workload)) .and_modify(|max| { if query_duration > *max { *max = query_duration; @@ -435,7 +414,7 @@ fn record_query_duration_metrics(workload: WorkloadType, db: &Address, query: &s DB_METRICS .rdb_query_cpu_time_sec_max - .with_label_values(&workload, db, query) + .with_label_values(&workload, db) .set(max_query_duration); } @@ -462,7 +441,6 @@ impl From for TableOp { /// Helper for evaluating a [`query::Supported::Semijoin`]. struct IncrementalJoin<'a> { join: &'a IndexJoin, - info: &'a QueryDebugInfo, index_side: JoinSide<'a>, probe_side: JoinSide<'a>, } @@ -513,7 +491,6 @@ impl<'a> IncrementalJoin<'a> { /// An error is returned if the expression is not well-formed. pub fn new( join: &'a QueryExpr, - info: &'a QueryDebugInfo, updates: impl Iterator, ) -> anyhow::Result> { if join.query.len() != 1 { @@ -579,7 +556,6 @@ impl<'a> IncrementalJoin<'a> { Ok(Some(Self { join, - info, index_side, probe_side, })) @@ -637,9 +613,9 @@ impl<'a> IncrementalJoin<'a> { // A query evaluator for inserts let mut eval = |query, is_primary| { if is_primary { - evaluator_for_primary_updates(db, *auth)(tx, query, self.info) + evaluator_for_primary_updates(db, *auth)(tx, query) } else { - evaluator_for_secondary_updates(db, *auth, true)(tx, query, self.info) + evaluator_for_secondary_updates(db, *auth, true)(tx, query) } }; @@ -665,9 +641,9 @@ impl<'a> IncrementalJoin<'a> { // A query evaluator for deletes let mut eval = |query, is_primary| { if is_primary { - evaluator_for_primary_updates(db, *auth)(tx, query, self.info) + evaluator_for_primary_updates(db, *auth)(tx, query) } else { - evaluator_for_secondary_updates(db, *auth, false)(tx, query, self.info) + evaluator_for_secondary_updates(db, *auth, false)(tx, query) } }; diff --git a/crates/sqltest/src/space.rs b/crates/sqltest/src/space.rs index 9d9bf3afdc7..13e66f65cf3 100644 --- a/crates/sqltest/src/space.rs +++ b/crates/sqltest/src/space.rs @@ -5,7 +5,6 @@ use spacetimedb::error::DBError; use spacetimedb::execution_context::ExecutionContext; use spacetimedb::sql::compiler::compile_sql; use spacetimedb::sql::execute::execute_sql; -use spacetimedb::sql::query_debug_info::QueryDebugInfo; use spacetimedb_lib::identity::AuthCtx; use spacetimedb_sats::meta_type::MetaType; use spacetimedb_sats::relation::MemTable; @@ -83,7 +82,7 @@ impl SpaceDb { pub(crate) fn run_sql(&self, sql: &str) -> anyhow::Result> { self.conn.with_auto_commit(&ExecutionContext::default(), |tx| { let ast = compile_sql(&self.conn, tx, sql)?; - let result = execute_sql(&self.conn, tx, ast, Some(&QueryDebugInfo::from_source(sql)), self.auth)?; + let result = execute_sql(&self.conn, tx, ast, self.auth)?; //remove comments to see which SQL worked. Can't collect it outside from lack of a hook in the external `sqllogictest` crate... :( //append_file(&std::path::PathBuf::from(".ok.sql"), sql)?; Ok(result)