Skip to content

Commit 0b011b5

Browse files
perf(832): Remove redundant row deduplication in subscriptions
Closes #832. The database already operates under set semantics, so unless multiple queries return rows from the same table, deduplication of the result set is not necessary.
1 parent a16bbf8 commit 0b011b5

File tree

4 files changed

+336
-296
lines changed

4 files changed

+336
-296
lines changed

crates/bench/benches/subscription.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use spacetimedb::error::DBError;
44
use spacetimedb::execution_context::ExecutionContext;
55
use spacetimedb::host::module_host::{DatabaseTableUpdate, DatabaseUpdate, TableOp};
66
use spacetimedb::subscription::query::compile_read_only_query;
7+
use spacetimedb::subscription::subscription::ExecutionSet;
78
use spacetimedb_lib::identity::AuthCtx;
89
use spacetimedb_primitives::TableId;
910
use spacetimedb_sats::{product, AlgebraicType, AlgebraicValue, ProductValue, ToDataKey};
@@ -110,6 +111,7 @@ fn eval(c: &mut Criterion) {
110111
let auth = AuthCtx::for_testing();
111112
let tx = db.begin_tx();
112113
let query = compile_read_only_query(&db, &tx, &auth, scan).unwrap();
114+
let query: ExecutionSet = query.into();
113115

114116
b.iter(|| {
115117
let out = query.eval(&db, &tx, auth).unwrap();
@@ -131,6 +133,7 @@ fn eval(c: &mut Criterion) {
131133
let auth = AuthCtx::for_testing();
132134
let tx = db.begin_tx();
133135
let query = compile_read_only_query(&db, &tx, &auth, &join).unwrap();
136+
let query: ExecutionSet = query.into();
134137

135138
b.iter(|| {
136139
let out = query.eval(&db, &tx, AuthCtx::for_testing()).unwrap();
@@ -148,9 +151,7 @@ fn eval(c: &mut Criterion) {
148151
let tx = db.begin_tx();
149152
let query_lhs = compile_read_only_query(&db, &tx, &auth, select_lhs).unwrap();
150153
let query_rhs = compile_read_only_query(&db, &tx, &auth, select_rhs).unwrap();
151-
152-
let mut query = query_lhs;
153-
query.extend(query_rhs);
154+
let query = ExecutionSet::from_iter(query_lhs.into_iter().chain(query_rhs));
154155

155156
b.iter(|| {
156157
let out = query.eval_incr(&db, &tx, &update, AuthCtx::for_testing()).unwrap();
@@ -171,6 +172,7 @@ fn eval(c: &mut Criterion) {
171172
let auth = AuthCtx::for_testing();
172173
let tx = db.begin_tx();
173174
let query = compile_read_only_query(&db, &tx, &auth, &join).unwrap();
175+
let query: ExecutionSet = query.into();
174176

175177
b.iter(|| {
176178
let out = query.eval_incr(&db, &tx, &update, AuthCtx::for_testing()).unwrap();

crates/core/src/subscription/module_subscription_actor.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::sync::Arc;
22

33
use super::{
44
query::compile_read_only_query,
5-
subscription::{QuerySet, Subscription},
5+
subscription::{ExecutionSet, Subscription},
66
};
77
use crate::client::{
88
messages::{CachedMessage, SubscriptionUpdateMessage, TransactionUpdateMessage},
@@ -45,12 +45,15 @@ impl ModuleSubscriptions {
4545
});
4646

4747
let auth = AuthCtx::new(self.owner_identity, sender.id.identity);
48-
let mut queries = QuerySet::new();
48+
let mut queries = Vec::new();
4949
for sql in subscription.query_strings {
5050
let qset = compile_read_only_query(&self.relational_db, tx, &auth, &sql)?;
5151
queries.extend(qset);
5252
}
5353

54+
let n = queries.len();
55+
let queries = ExecutionSet::from_iter(queries);
56+
5457
let database_update = tokio::task::block_in_place(|| queries.eval(&self.relational_db, tx, auth))?;
5558
// It acquires the subscription lock after `eval`, allowing `add_subscription` to run concurrently.
5659
// This also makes it possible for `broadcast_event` to get scheduled before the subsequent part here
@@ -63,7 +66,6 @@ impl ModuleSubscriptions {
6366
sub
6467
}
6568
None => {
66-
let n = queries.len();
6769
subscriptions.push(Subscription::new(queries, sender));
6870
WORKER_METRICS
6971
.subscription_queries
@@ -100,7 +102,7 @@ impl ModuleSubscriptions {
100102
WORKER_METRICS
101103
.subscription_queries
102104
.with_label_values(&self.relational_db.address())
103-
.sub(subscription.queries.len() as i64);
105+
.sub(subscription.queries.num_queries() as i64);
104106
}
105107
!subscription.subscribers().is_empty()
106108
});

crates/core/src/subscription/query.rs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::execution_context::{ExecutionContext, WorkloadType};
77
use crate::host::module_host::DatabaseTableUpdate;
88
use crate::sql::compiler::compile_sql;
99
use crate::sql::execute::execute_single_sql;
10-
use crate::subscription::subscription::{QuerySet, SupportedQuery};
10+
use crate::subscription::subscription::SupportedQuery;
1111
use once_cell::sync::Lazy;
1212
use regex::Regex;
1313
use spacetimedb_lib::identity::AuthCtx;
@@ -19,6 +19,8 @@ use spacetimedb_sats::DataKey;
1919
use spacetimedb_vm::expr;
2020
use spacetimedb_vm::expr::{Crud, CrudExpr, DbType, QueryExpr};
2121

22+
use super::subscription::get_all;
23+
2224
static WHITESPACE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\s+").unwrap());
2325
pub const SUBSCRIBE_TO_ALL_QUERY: &str = "SELECT * FROM *";
2426

@@ -103,7 +105,7 @@ pub fn compile_read_only_query(
103105
tx: &Tx,
104106
auth: &AuthCtx,
105107
input: &str,
106-
) -> Result<QuerySet, DBError> {
108+
) -> Result<Vec<SupportedQuery>, DBError> {
107109
let input = input.trim();
108110
if input.is_empty() {
109111
return Err(SubscriptionError::Empty.into());
@@ -112,7 +114,7 @@ pub fn compile_read_only_query(
112114
// Remove redundant whitespace, and in particular newlines, for debug info.
113115
let input = WHITESPACE.replace_all(input, " ");
114116
if input == SUBSCRIBE_TO_ALL_QUERY {
115-
return QuerySet::get_all(relational_db, tx, auth);
117+
return get_all(relational_db, tx, auth);
116118
}
117119

118120
let compiled = compile_sql(relational_db, tx, &input)?;
@@ -204,6 +206,7 @@ mod tests {
204206
use crate::db::relational_db::MutTx;
205207
use crate::host::module_host::{DatabaseUpdate, TableOp};
206208
use crate::sql::execute::run;
209+
use crate::subscription::subscription::ExecutionSet;
207210
use crate::vm::tests::create_table_with_rows;
208211
use itertools::Itertools;
209212
use spacetimedb_lib::error::ResultTest;
@@ -359,7 +362,7 @@ mod tests {
359362
fn check_query_incr(
360363
db: &RelationalDB,
361364
tx: &Tx,
362-
s: &QuerySet,
365+
s: &ExecutionSet,
363366
update: &DatabaseUpdate,
364367
total_tables: usize,
365368
rows: &[ProductValue],
@@ -381,7 +384,7 @@ mod tests {
381384
fn check_query_eval(
382385
db: &RelationalDB,
383386
tx: &Tx,
384-
s: &QuerySet,
387+
s: &ExecutionSet,
385388
total_tables: usize,
386389
rows: &[ProductValue],
387390
) -> ResultTest<()> {
@@ -414,7 +417,7 @@ mod tests {
414417
let table_id = create_table_with_rows(&db, &mut tx, "test", schema.clone(), &[])?;
415418

416419
// select * from test
417-
let query: QuerySet = QueryExpr::new(db_table(schema.clone(), table_id)).try_into()?;
420+
let query: ExecutionSet = QueryExpr::new(db_table(schema.clone(), table_id)).try_into()?;
418421

419422
let op = TableOp {
420423
op_type: 0,
@@ -481,7 +484,7 @@ mod tests {
481484
panic!("unexpected query {:#?}", exp[0]);
482485
};
483486

484-
let query = QuerySet::try_from(query)?;
487+
let query: ExecutionSet = query.try_into()?;
485488

486489
let result = query.eval_incr(&db, &tx, &update, AuthCtx::for_testing())?;
487490

@@ -546,7 +549,8 @@ mod tests {
546549
panic!("unexpected query {:#?}", exp[0]);
547550
};
548551

549-
let query = QuerySet::try_from(query)?;
552+
let query: ExecutionSet = query.try_into()?;
553+
550554
db.release_tx(&ExecutionContext::default(), tx);
551555

552556
fn case_env(
@@ -825,7 +829,7 @@ mod tests {
825829
let s = [q_all, q_id]
826830
.into_iter()
827831
.map(TryFrom::try_from)
828-
.collect::<Result<QuerySet, _>>()?;
832+
.collect::<Result<ExecutionSet, _>>()?;
829833

830834
let row1 = TableOp {
831835
op_type: 0,
@@ -899,7 +903,8 @@ mod tests {
899903
let s = [q_all, q_id]
900904
.into_iter()
901905
.map(TryFrom::try_from)
902-
.collect::<Result<QuerySet, _>>()?;
906+
.collect::<Result<ExecutionSet, _>>()?;
907+
903908
db.commit_tx(&ExecutionContext::default(), tx)?;
904909

905910
let tx = db.begin_tx();
@@ -1009,7 +1014,7 @@ mod tests {
10091014
let row_1 = product!(1u64, "health");
10101015
let row_2 = product!(2u64, "jhon doe");
10111016
let tx = db.begin_tx();
1012-
let s = compile_read_only_query(&db, &tx, &AuthCtx::for_testing(), SUBSCRIBE_TO_ALL_QUERY)?;
1017+
let s = compile_read_only_query(&db, &tx, &AuthCtx::for_testing(), SUBSCRIBE_TO_ALL_QUERY)?.into();
10131018
check_query_eval(&db, &tx, &s, 2, &[row_1.clone(), row_2.clone()])?;
10141019

10151020
let row1 = TableOp {
@@ -1078,14 +1083,14 @@ mod tests {
10781083
"SELECT * FROM lhs WHERE id > 5",
10791084
];
10801085
for scan in scans {
1081-
let expr = compile_read_only_query(&db, &tx, &auth, scan)?.pop_first().unwrap();
1086+
let expr = compile_read_only_query(&db, &tx, &auth, scan)?.pop().unwrap();
10821087
assert_eq!(expr.kind(), Supported::Scan, "{scan}\n{expr:#?}");
10831088
}
10841089

10851090
// Only index semijoins are supported
10861091
let joins = ["SELECT lhs.* FROM lhs JOIN rhs ON lhs.id = rhs.id WHERE rhs.y < 10"];
10871092
for join in joins {
1088-
let expr = compile_read_only_query(&db, &tx, &auth, join)?.pop_first().unwrap();
1093+
let expr = compile_read_only_query(&db, &tx, &auth, join)?.pop().unwrap();
10891094
assert_eq!(expr.kind(), Supported::Semijoin, "{join}\n{expr:#?}");
10901095
}
10911096

0 commit comments

Comments
 (0)