Skip to content

Commit abcf896

Browse files
Refactor DbProgram to borrow transaction (#9)
Co-authored-by: Tyler Cloutier <[email protected]>
1 parent e29a5e5 commit abcf896

File tree

7 files changed

+114
-123
lines changed

7 files changed

+114
-123
lines changed

crates/core/src/db/message_log.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ mod tests {
280280
//let path = "/Users/tylercloutier/Developer/SpacetimeDB/test";
281281
let mut message_log = MessageLog::open(path)?;
282282

283-
const MESSAGE_COUNT: i32 = 100_000_000;
283+
const MESSAGE_COUNT: i32 = 100_000;
284284
let start = std::time::Instant::now();
285285
for _i in 0..MESSAGE_COUNT {
286286
let s = b"yo this is tyler";
@@ -305,7 +305,7 @@ mod tests {
305305
//let path = "/Users/tylercloutier/Developer/SpacetimeDB/test";
306306
let mut message_log = MessageLog::open(path)?;
307307

308-
const MESSAGE_COUNT: i32 = 100_000_000;
308+
const MESSAGE_COUNT: i32 = 100_000;
309309
let start = std::time::Instant::now();
310310
for _i in 0..MESSAGE_COUNT {
311311
let s = b"yo this is tyler";
@@ -323,7 +323,7 @@ mod tests {
323323
drop(message_log);
324324

325325
let message_log = MessageLog::open(path)?;
326-
assert!(message_log.size() == 2_000_000_000);
326+
assert!(message_log.size() == 2_000_000);
327327

328328
Ok(())
329329
}

crates/core/src/db/relational_db.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -276,9 +276,9 @@ impl RelationalDB {
276276
/// Ok(())
277277
/// })?;
278278
/// ```
279-
pub fn with_auto_commit<F, A, E>(&self, mut f: F) -> Result<A, E>
279+
pub fn with_auto_commit<F, A, E>(&self, f: F) -> Result<A, E>
280280
where
281-
F: FnMut(&mut MutTxId) -> Result<A, E>,
281+
F: FnOnce(&mut MutTxId) -> Result<A, E>,
282282
E: From<DBError>,
283283
{
284284
let mut tx = self.begin_tx();

crates/core/src/host/instance_env.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -458,9 +458,8 @@ impl InstanceEnv {
458458
}
459459
}
460460

461-
let stdb = self.dbic.relational_db.clone();
462-
let tx = self.tx.clone();
463-
let tx = &mut *tx.get()?;
461+
let stdb = &self.dbic.relational_db;
462+
let tx = &mut *self.tx.get()?;
464463

465464
let schema = stdb.schema_for_table(tx, table_id)?;
466465
let row_type = ProductType::from(&schema);
@@ -474,7 +473,7 @@ impl InstanceEnv {
474473
)
475474
.map_err(NodesError::DecodeFilter)?;
476475
let q = spacetimedb_vm::dsl::query(&schema).with_select(filter_to_column_op(&schema.table_name, filter));
477-
let p = &mut DbProgram::new((*stdb).clone());
476+
let p = &mut DbProgram::new(stdb, tx);
478477
let results = match spacetimedb_vm::eval::run_ast(p, q.into()) {
479478
Code::Table(table) => table,
480479
_ => unreachable!("query should always return a table"),

crates/core/src/sql/execute.rs

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -51,23 +51,27 @@ pub fn compile_sql(db: &RelationalDB, sql_text: &str) -> Result<Vec<CrudExpr>, D
5151
}
5252

5353
pub fn execute_single_sql(db: &RelationalDB, ast: CrudExpr) -> Result<Vec<MemTable>, DBError> {
54-
let p = &mut DbProgram::new(db.clone());
55-
let q = Expr::Crud(Box::new(ast));
56-
57-
let mut result = Vec::with_capacity(1);
58-
collect_result(&mut result, run_ast(p, q).into())?;
59-
Ok(result)
54+
db.with_auto_commit(|tx| {
55+
let p = &mut DbProgram::new(db, tx);
56+
let q = Expr::Crud(Box::new(ast));
57+
58+
let mut result = Vec::with_capacity(1);
59+
collect_result(&mut result, run_ast(p, q).into())?;
60+
Ok(result)
61+
})
6062
}
6163

6264
pub fn execute_sql(db: &RelationalDB, ast: Vec<CrudExpr>) -> Result<Vec<MemTable>, DBError> {
63-
let total = ast.len();
65+
db.with_auto_commit(|tx| {
66+
let total = ast.len();
6467

65-
let p = &mut DbProgram::new(db.clone());
66-
let q = Expr::Block(ast.into_iter().map(|x| Expr::Crud(Box::new(x))).collect());
68+
let p = &mut DbProgram::new(db, tx);
69+
let q = Expr::Block(ast.into_iter().map(|x| Expr::Crud(Box::new(x))).collect());
6770

68-
let mut result = Vec::with_capacity(total);
69-
collect_result(&mut result, run_ast(p, q).into())?;
70-
Ok(result)
71+
let mut result = Vec::with_capacity(total);
72+
collect_result(&mut result, run_ast(p, q).into())?;
73+
Ok(result)
74+
})
7175
}
7276

7377
fn run(db: &RelationalDB, sql_text: &str) -> Result<Vec<MemTable>, DBError> {
@@ -91,9 +95,11 @@ pub(crate) mod tests {
9195
fn create_data(total_rows: u64) -> ResultTest<(RelationalDB, MemTable, TempDir)> {
9296
let (db, tmp_dir) = make_test_db()?;
9397

98+
let mut tx = db.begin_tx();
9499
let head = ProductType::from_iter([("inventory_id", BuiltinType::U64), ("name", BuiltinType::String)]);
95100
let rows: Vec<_> = (1..=total_rows).map(|i| product!(i, format!("health{i}"))).collect();
96-
create_table_with_rows(&db, "inventory", head.clone(), &rows)?;
101+
create_table_with_rows(&db, &mut tx, "inventory", head.clone(), &rows)?;
102+
db.commit_tx(tx)?;
97103

98104
Ok((db, mem_table(head, rows), tmp_dir))
99105
}
@@ -304,9 +310,11 @@ pub(crate) mod tests {
304310

305311
let (db, _tmp_dir) = make_test_db()?;
306312

307-
create_table_with_rows(&db, "Inventory", data.inv.head.into(), &data.inv.data)?;
308-
create_table_with_rows(&db, "Player", data.player.head.into(), &data.player.data)?;
309-
create_table_with_rows(&db, "Location", data.location.head.into(), &data.location.data)?;
313+
let mut tx = db.begin_tx();
314+
create_table_with_rows(&db, &mut tx, "Inventory", data.inv.head.into(), &data.inv.data)?;
315+
create_table_with_rows(&db, &mut tx, "Player", data.player.head.into(), &data.player.data)?;
316+
create_table_with_rows(&db, &mut tx, "Location", data.location.head.into(), &data.location.data)?;
317+
db.commit_tx(tx)?;
310318

311319
let result = &run(
312320
&db,

crates/core/src/subscription/query.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -97,16 +97,17 @@ mod tests {
9797
#[test]
9898
fn test_subscribe() -> ResultTest<()> {
9999
let (db, _tmp_dir) = make_test_db()?;
100-
let p = &mut DbProgram::new(db.clone());
100+
101+
let mut tx = db.begin_tx();
102+
let p = &mut DbProgram::new(&db, &mut tx);
101103

102104
let head = ProductType::from_iter([("inventory_id", BuiltinType::U64), ("name", BuiltinType::String)]);
103105
let row = product!(1u64, "health");
104106
let table = mem_table(head.clone(), [row.clone()]);
105107
let table_id = create_table_from_program(p, "inventory", head.clone(), &[row.clone()])?;
106108

107-
let tx = db.begin_tx();
108109
let schema = db.schema_for_table(&tx, table_id).unwrap();
109-
db.rollback_tx(tx);
110+
db.commit_tx(tx)?;
110111

111112
let op = TableOp {
112113
op_type: 0,
@@ -149,6 +150,7 @@ mod tests {
149150
Some(table.as_without_table_name()),
150151
result.first().map(|x| x.as_without_table_name())
151152
);
153+
152154
Ok(())
153155
}
154156

@@ -161,15 +163,16 @@ mod tests {
161163
#[test]
162164
fn test_subscribe_dedup() -> ResultTest<()> {
163165
let (db, _tmp_dir) = make_test_db()?;
164-
let p = &mut DbProgram::new(db.clone());
166+
167+
let mut tx = db.begin_tx();
168+
let p = &mut DbProgram::new(&db, &mut tx);
165169

166170
let head = ProductType::from_iter([("inventory_id", BuiltinType::U64), ("name", BuiltinType::String)]);
167171
let row = product!(1u64, "health");
168172
let table_id = create_table_from_program(p, "inventory", head, &[row.clone()])?;
169173

170-
let tx = db.begin_tx();
171174
let schema = db.schema_for_table(&tx, table_id).unwrap();
172-
db.rollback_tx(tx);
175+
db.commit_tx(tx)?;
173176

174177
//SELECT * FROM inventory
175178
let q_all = QueryExpr::new(db_table((&schema).into(), "inventory", table_id));
@@ -192,21 +195,23 @@ mod tests {
192195
assert_eq!(result.tables.len(), 1, "Must return 1 table");
193196
assert_eq!(result.tables[0].ops.len(), 1, "Must return 1 row");
194197
assert_eq!(result.tables[0].ops[0].row, row, "Must return the correct row");
198+
195199
Ok(())
196200
}
197201

198202
#[test]
199203
fn test_subscribe_dedup_incr() -> ResultTest<()> {
200204
let (db, _tmp_dir) = make_test_db()?;
201-
let p = &mut DbProgram::new(db.clone());
205+
206+
let mut tx = db.begin_tx();
207+
let p = &mut DbProgram::new(&db, &mut tx);
202208

203209
let head = ProductType::from_iter([("inventory_id", BuiltinType::U64), ("name", BuiltinType::String)]);
204210
let row = product!(1u64, "health");
205211
let table_id = create_table_from_program(p, "inventory", head, &[row.clone()])?;
206212

207-
let tx = db.begin_tx();
208213
let schema = db.schema_for_table(&tx, table_id).unwrap();
209-
db.rollback_tx(tx);
214+
db.commit_tx(tx)?;
210215

211216
//SELECT * FROM inventory
212217
let q_all = QueryExpr::new(db_table((&schema).into(), "inventory", table_id));
@@ -249,6 +254,7 @@ mod tests {
249254
assert_eq!(result.tables.len(), 1, "Must return 1 table");
250255
assert_eq!(result.tables[0].ops.len(), 1, "Must return 1 row");
251256
assert_eq!(result.tables[0].ops[0].row, row, "Must return the correct row");
257+
252258
Ok(())
253259
}
254260
}

crates/core/src/subscription/subscription.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ impl QuerySet {
101101
///
102102
/// This is a *major* difference with normal query execution, where is expected to return the full result set for each query.
103103
pub fn eval(&self, relational_db: &RelationalDB) -> Result<DatabaseUpdate, DBError> {
104-
let mut database_update = DatabaseUpdate { tables: vec![] };
104+
let mut database_update: DatabaseUpdate = DatabaseUpdate { tables: vec![] };
105105
let mut seen = HashSet::new();
106106

107107
for query in &self.0 {

0 commit comments

Comments
 (0)