Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions crates/core/src/db/commit_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ impl CommitLogMut {

let workload = &ctx.workload();
let db = &ctx.database();
let reducer_or_query = &ctx.reducer_or_query();
let reducer = &ctx.reducer_name();

for record in &tx_data.records {
let table_id: u32 = record.table_id.into();
Expand All @@ -378,7 +378,7 @@ impl CommitLogMut {
// Increment rows inserted metric
DB_METRICS
.rdb_num_rows_inserted
.with_label_values(workload, db, reducer_or_query, &table_id, table_name)
.with_label_values(workload, db, reducer, &table_id, table_name)
.inc();
// Increment table rows gauge
DB_METRICS
Expand All @@ -391,7 +391,7 @@ impl CommitLogMut {
// Increment rows deleted metric
DB_METRICS
.rdb_num_rows_deleted
.with_label_values(workload, db, reducer_or_query, &table_id, table_name)
.with_label_values(workload, db, reducer, &table_id, table_name)
.inc();
// Decrement table rows gauge
DB_METRICS
Expand Down
20 changes: 10 additions & 10 deletions crates/core/src/db/datastore/locking_tx_datastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ impl TxId {
fn release(self, ctx: &ExecutionContext) {
let workload = &ctx.workload();
let db = &ctx.database();
let reducer = ctx.reducer_name().unwrap_or_default();
let reducer = ctx.reducer_name();
let elapsed_time = self.timer.elapsed();
let cpu_time = elapsed_time - self.lock_wait_time;
DB_METRICS
Expand Down Expand Up @@ -1978,7 +1978,7 @@ impl Drop for Iter<'_> {
.with_label_values(
&self.ctx.workload(),
&self.ctx.database(),
self.ctx.reducer_or_query(),
self.ctx.reducer_name(),
&self.table_id.into(),
self.table_name,
)
Expand Down Expand Up @@ -2118,7 +2118,7 @@ impl Drop for IndexSeekIterMutTxId<'_> {
.with_label_values(
&self.ctx.workload(),
&self.ctx.database(),
self.ctx.reducer_or_query(),
self.ctx.reducer_name(),
&self.table_id.0,
table_name,
)
Expand All @@ -2130,7 +2130,7 @@ impl Drop for IndexSeekIterMutTxId<'_> {
.with_label_values(
&self.ctx.workload(),
&self.ctx.database(),
self.ctx.reducer_or_query(),
self.ctx.reducer_name(),
&self.table_id.0,
table_name,
)
Expand All @@ -2142,7 +2142,7 @@ impl Drop for IndexSeekIterMutTxId<'_> {
.with_label_values(
&self.ctx.workload(),
&self.ctx.database(),
self.ctx.reducer_or_query(),
self.ctx.reducer_name(),
&self.table_id.0,
table_name,
)
Expand Down Expand Up @@ -2201,7 +2201,7 @@ impl Drop for CommittedIndexIter<'_> {
.with_label_values(
&self.ctx.workload(),
&self.ctx.database(),
self.ctx.reducer_or_query(),
self.ctx.reducer_name(),
&self.table_id.0,
table_name,
)
Expand All @@ -2213,7 +2213,7 @@ impl Drop for CommittedIndexIter<'_> {
.with_label_values(
&self.ctx.workload(),
&self.ctx.database(),
self.ctx.reducer_or_query(),
self.ctx.reducer_name(),
&self.table_id.0,
table_name,
)
Expand All @@ -2225,7 +2225,7 @@ impl Drop for CommittedIndexIter<'_> {
.with_label_values(
&self.ctx.workload(),
&self.ctx.database(),
self.ctx.reducer_or_query(),
self.ctx.reducer_name(),
&self.table_id.0,
table_name,
)
Expand Down Expand Up @@ -2401,7 +2401,7 @@ impl traits::MutTx for Locking {
fn rollback_mut_tx(&self, ctx: &ExecutionContext, tx: Self::MutTx) {
let workload = &ctx.workload();
let db = &ctx.database();
let reducer = ctx.reducer_name().unwrap_or_default();
let reducer = ctx.reducer_name();
let elapsed_time = tx.timer.elapsed();
let cpu_time = elapsed_time - tx.lock_wait_time;
DB_METRICS
Expand All @@ -2422,7 +2422,7 @@ impl traits::MutTx for Locking {
fn commit_mut_tx(&self, ctx: &ExecutionContext, tx: Self::MutTx) -> super::Result<Option<TxData>> {
let workload = &ctx.workload();
let db = &ctx.database();
let reducer = ctx.reducer_name().unwrap_or_default();
let reducer = ctx.reducer_name();
let elapsed_time = tx.timer.elapsed();
let cpu_time = elapsed_time - tx.lock_wait_time;

Expand Down
17 changes: 9 additions & 8 deletions crates/core/src/db/db_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,28 +83,28 @@ metrics_group!(

#[name = spacetime_query_cpu_time_sec]
#[help = "The time spent executing a query (in seconds)"]
#[labels(txn_type: WorkloadType, db: Address, query: str)]
#[labels(txn_type: WorkloadType, db: Address)]
#[buckets(
1e-6, 5e-6, 1e-5, 5e-5, 1e-4, 5e-4, 1e-3, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0
)]
pub rdb_query_cpu_time_sec: HistogramVec,

#[name = spacetime_query_cpu_time_sec_max]
#[help = "The cpu time of the longest running query (in seconds)"]
#[labels(txn_type: WorkloadType, db: Address, query: str)]
#[labels(txn_type: WorkloadType, db: Address)]
pub rdb_query_cpu_time_sec_max: GaugeVec,

#[name = spacetime_query_compile_time_sec]
#[help = "The time spent compiling a query (in seconds)"]
#[labels(txn_type: WorkloadType, db: Address, query: str)]
#[labels(txn_type: WorkloadType, db: Address)]
#[buckets(
1e-6, 5e-6, 1e-5, 5e-5, 1e-4, 5e-4, 1e-3, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0
)]
pub rdb_query_compile_time_sec: HistogramVec,

#[name = spacetime_query_compile_time_sec_max]
#[help = "The maximum query compilation time (in seconds)"]
#[labels(txn_type: WorkloadType, db: Address, query: str)]
#[labels(txn_type: WorkloadType, db: Address)]
pub rdb_query_compile_time_sec_max: GaugeVec,

#[name = spacetime_wasm_abi_call_duration_sec]
Expand Down Expand Up @@ -132,11 +132,12 @@ metrics_group!(
}
);

type Triple = (Address, WorkloadType, String);
type ReducerLabel = (Address, WorkloadType, String);
type AddressLabel = (Address, WorkloadType);

pub static MAX_TX_CPU_TIME: Lazy<Mutex<HashMap<Triple, f64>>> = Lazy::new(|| Mutex::new(HashMap::new()));
pub static MAX_QUERY_CPU_TIME: Lazy<Mutex<HashMap<Triple, f64>>> = Lazy::new(|| Mutex::new(HashMap::new()));
pub static MAX_QUERY_COMPILE_TIME: Lazy<Mutex<HashMap<Triple, f64>>> = Lazy::new(|| Mutex::new(HashMap::new()));
pub static MAX_TX_CPU_TIME: Lazy<Mutex<HashMap<ReducerLabel, f64>>> = Lazy::new(|| Mutex::new(HashMap::new()));
pub static MAX_QUERY_CPU_TIME: Lazy<Mutex<HashMap<AddressLabel, f64>>> = Lazy::new(|| Mutex::new(HashMap::new()));
pub static MAX_QUERY_COMPILE_TIME: Lazy<Mutex<HashMap<AddressLabel, f64>>> = Lazy::new(|| Mutex::new(HashMap::new()));
pub static DB_METRICS: Lazy<DbMetrics> = Lazy::new(DbMetrics::new);

pub fn reset_counters() {
Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use thiserror::Error;

use crate::client::ClientActorId;
use crate::db::datastore::system_tables::SystemTable;
use crate::sql::query_debug_info::QueryDebugInfo;
use spacetimedb_lib::buffer::DecodeError;
use spacetimedb_lib::{PrimaryKey, ProductValue};
use spacetimedb_primitives::*;
Expand Down Expand Up @@ -95,7 +94,7 @@ pub enum SubscriptionError {
#[error("Queries with side effects not allowed: {0:?}")]
SideEffect(Crud),
#[error("Unsupported query on subscription: {0:?}")]
Unsupported(QueryDebugInfo),
Unsupported(String),
}

#[derive(Error, Debug)]
Expand Down
42 changes: 6 additions & 36 deletions crates/core/src/execution_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ use spacetimedb_lib::Address;
use spacetimedb_metrics::impl_prometheusvalue_string;
use spacetimedb_metrics::typed_prometheus::AsPrometheusLabel;

use crate::sql::query_debug_info::QueryDebugInfo;

/// Represents the context under which a database runtime method is executed.
/// In particular it provides details about the currently executing txn to runtime operations.
/// More generally it acts as a container for information that database operations may require to function correctly.
Expand All @@ -13,15 +11,8 @@ pub struct ExecutionContext<'a> {
/// The database on which a transaction is being executed.
database: Address,
/// The reducer from which the current transaction originated.
/// Note: this will never be set at the same time as `query`.
reducer: Option<&'a str>,
/// The SQL query being executed, if any.
/// Note: this will never be set at the same time as `reducer`.
/// It is also NOT guaranteed to be set, even if workload == Sql.
/// This is because some transactions tagged "SQL" don't exactly correspond
/// to any particular query.
query_debug_info: Option<&'a QueryDebugInfo>,
// The type of workload that is being executed.
/// The type of workload that is being executed.
workload: WorkloadType,
}

Expand Down Expand Up @@ -52,37 +43,33 @@ impl<'a> ExecutionContext<'a> {
Self {
database,
reducer: Some(name),
query_debug_info: None,
workload: WorkloadType::Reducer,
}
}

/// Returns an [ExecutionContext] for a one-off sql query.
pub fn sql(database: Address, query_debug_info: Option<&'a QueryDebugInfo>) -> Self {
pub fn sql(database: Address) -> Self {
Self {
database,
reducer: None,
query_debug_info,
workload: WorkloadType::Sql,
}
}

/// Returns an [ExecutionContext] for an initial subscribe call.
pub fn subscribe(database: Address, query_debug_info: Option<&'a QueryDebugInfo>) -> Self {
pub fn subscribe(database: Address) -> Self {
Self {
database,
reducer: None,
query_debug_info,
workload: WorkloadType::Subscribe,
}
}

/// Returns an [ExecutionContext] for a subscription update.
pub fn incremental_update(database: Address, query_debug_info: Option<&'a QueryDebugInfo>) -> Self {
pub fn incremental_update(database: Address) -> Self {
Self {
database,
reducer: None,
query_debug_info,
workload: WorkloadType::Update,
}
}
Expand All @@ -92,7 +79,6 @@ impl<'a> ExecutionContext<'a> {
Self {
database,
reducer: None,
query_debug_info: None,
workload: WorkloadType::Internal,
}
}
Expand All @@ -103,26 +89,10 @@ impl<'a> ExecutionContext<'a> {
self.database
}

/// Returns the name of the reducer that is being executed.
/// Returns [None] if this is not a reducer context.
#[inline]
pub fn reducer_name(&self) -> Option<&str> {
self.reducer
}

/// Returns the debug info for the query being executed.
/// Returns [None] if this is not a sql context.
#[inline]
pub fn query_debug_info(&self) -> Option<&QueryDebugInfo> {
self.query_debug_info
}

/// If this is a reducer context, returns the name of the reducer.
/// If this is a query context, returns the query string.
#[inline]
pub fn reducer_or_query(&self) -> &str {
self.reducer
.unwrap_or_else(|| self.query_debug_info.map(|info| info.source()).unwrap_or_default())
pub fn reducer_name(&self) -> &str {
self.reducer.unwrap_or_default()
}

/// Returns the type of workload that is being executed.
Expand Down
34 changes: 15 additions & 19 deletions crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use crate::host::{
};
use crate::identity::Identity;
use crate::sql;
use crate::sql::query_debug_info::QueryDebugInfo;
use crate::subscription::module_subscription_actor::ModuleSubscriptionManager;
use crate::util::{const_unwrap, ResultInspectExt};
use crate::worker_metrics::WORKER_METRICS;
Expand Down Expand Up @@ -269,24 +268,21 @@ impl<T: WasmModule> Module for WasmModuleHostActor<T> {
let auth = AuthCtx::new(self.database_instance_context.identity, caller_identity);
// TODO(jgilles): make this a read-only TX when those get added

db.with_read_only(
&ExecutionContext::sql(db.address(), Some(&QueryDebugInfo::from_source(&query))),
|tx| {
log::debug!("One-off query: {query}");

let compiled = sql::compiler::compile_sql(db, tx, &query)?
.into_iter()
.map(|expr| {
if matches!(expr, CrudExpr::Query { .. }) {
Ok(expr)
} else {
Err(anyhow!("One-off queries are not allowed to modify the database"))
}
})
.collect::<Result<_, _>>()?;
sql::execute::execute_sql(db, tx, compiled, Some(&QueryDebugInfo::from_source(&query)), auth)
},
)
db.with_read_only(&ExecutionContext::sql(db.address()), |tx| {
log::debug!("One-off query: {query}");

let compiled = sql::compiler::compile_sql(db, tx, &query)?
.into_iter()
.map(|expr| {
if matches!(expr, CrudExpr::Query { .. }) {
Ok(expr)
} else {
Err(anyhow!("One-off queries are not allowed to modify the database"))
}
})
.collect::<Result<_, _>>()?;
sql::execute::execute_sql(db, tx, compiled, auth)
})
}

fn clear_table(&self, table_name: String) -> Result<(), anyhow::Error> {
Expand Down
10 changes: 3 additions & 7 deletions crates/core/src/sql/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ use crate::execution_context::ExecutionContext;
use crate::sql::compiler::compile_sql;
use crate::vm::DbProgram;

use super::query_debug_info::QueryDebugInfo;

pub struct StmtResult {
pub schema: ProductType,
pub rows: Vec<ProductValue>,
Expand All @@ -33,8 +31,7 @@ pub fn execute(
info!(sql = sql_text);
if let Some((database_instance_context, _)) = db_inst_ctx_controller.get(database_instance_id) {
let db = &database_instance_context.relational_db;
let info = QueryDebugInfo::from_source(&sql_text);
let ctx = ExecutionContext::sql(db.address(), Some(&info));
let ctx = ExecutionContext::sql(db.address());
db.with_auto_commit(&ctx, |tx| {
run(&database_instance_context.relational_db, tx, &sql_text, auth)
})
Expand Down Expand Up @@ -81,11 +78,10 @@ pub fn execute_sql(
db: &RelationalDB,
tx: &mut MutTx,
ast: Vec<CrudExpr>,
query_debug_info: Option<&QueryDebugInfo>,
auth: AuthCtx,
) -> Result<Vec<MemTable>, DBError> {
let total = ast.len();
let ctx = ExecutionContext::sql(db.address(), query_debug_info);
let ctx = ExecutionContext::sql(db.address());
let p = &mut DbProgram::new(&ctx, db, tx, auth);
let q = Expr::Block(ast.into_iter().map(|x| Expr::Crud(Box::new(x))).collect());

Expand All @@ -98,7 +94,7 @@ pub fn execute_sql(
#[tracing::instrument(skip_all)]
pub fn run(db: &RelationalDB, tx: &mut MutTx, sql_text: &str, auth: AuthCtx) -> Result<Vec<MemTable>, DBError> {
let ast = compile_sql(db, tx, sql_text)?;
execute_sql(db, tx, ast, Some(&QueryDebugInfo::from_source(sql_text)), auth)
execute_sql(db, tx, ast, auth)
}

#[cfg(test)]
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/sql/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pub mod ast;
pub mod compiler;
pub mod execute;
pub mod query_debug_info;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to annotate query metrics with details about the query will need to be reworked.

Loading