Skip to content

Commit 53c5f10

Browse files
cloutiertylerjdetter
authored andcommitted
Revert "[ABI] Remove the special first element of iterator (#420)"
This reverts commit 469dff6.
1 parent 7623404 commit 53c5f10

File tree

7 files changed

+111
-26
lines changed

7 files changed

+111
-26
lines changed

crates/bindings-csharp/Runtime/Runtime.cs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,18 +140,20 @@ public void Reset()
140140

141141
public class RawTableIter : IEnumerable<byte[]>
142142
{
143-
private readonly uint tableId;
144-
private readonly byte[]? filterBytes;
143+
public readonly byte[] Schema;
144+
145+
private readonly IEnumerator<byte[]> iter;
145146

146147
public RawTableIter(uint tableId, byte[]? filterBytes = null)
147148
{
148-
this.tableId = tableId;
149-
this.filterBytes = filterBytes;
149+
iter = new BufferIter(tableId, filterBytes);
150+
iter.MoveNext();
151+
Schema = iter.Current;
150152
}
151153

152154
public IEnumerator<byte[]> GetEnumerator()
153155
{
154-
return new BufferIter(tableId, filterBytes);
156+
return iter;
155157
}
156158

157159
IEnumerator IEnumerable.GetEnumerator()

crates/bindings-csharp/Runtime/bindings.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ static MonoArray* stdb_buffer_consume(Buffer buf);
100100
// return out;
101101
// }
102102

103-
#define STDB_IMPORT_MODULE_MINOR(minor) "spacetime_7." #minor
103+
#define STDB_IMPORT_MODULE_MINOR(minor) "spacetime_6." #minor
104104
#define STDB_IMPORT_MODULE STDB_IMPORT_MODULE_MINOR(0)
105105

106106
__attribute__((import_module(STDB_IMPORT_MODULE),

crates/bindings-sys/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ pub mod raw {
2424
// on. Any non-breaking additions to the abi surface should be put in a new `extern {}` block
2525
// with a module identifier with a minor version 1 above the previous highest minor version.
2626
// For breaking changes, all functions should be moved into one new `spacetime_X.0` block.
27-
#[link(wasm_import_module = "spacetime_7.0")]
27+
#[link(wasm_import_module = "spacetime_6.0")]
2828
extern "C" {
2929
/*
3030
/// Create a table with `name`, a UTF-8 slice in WASM memory lasting `name_len` bytes,

crates/bindings/src/lib.rs

Lines changed: 73 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -198,11 +198,52 @@ pub fn delete_by_col_eq(table_id: TableId, col_id: u8, value: &impl Serialize) -
198198
})
199199
}
200200

201-
/// A table iterator which yields values of the `TableType` corresponding to the table.
202-
type TableTypeTableIter<T> = RawTableIter<TableTypeBufferDeserialize<T>>;
201+
/*
202+
pub fn delete_pk(table_id: u32, primary_key: &PrimaryKey) -> Result<()> {
203+
with_row_buf(|bytes| {
204+
primary_key.encode(bytes);
205+
sys::delete_pk(table_id, bytes)
206+
})
207+
}
208+
209+
pub fn delete_filter<F: Fn(&ProductValue) -> bool>(table_id: u32, f: F) -> Result<usize> {
210+
with_row_buf(|bytes| {
211+
let mut count = 0;
212+
for tuple_value in pv_table_iter(table_id, None)? {
213+
if f(&tuple_value) {
214+
count += 1;
215+
bytes.clear();
216+
tuple_value.encode(bytes);
217+
sys::delete_value(table_id, bytes)?;
218+
}
219+
}
220+
Ok(count)
221+
})
222+
}
223+
224+
pub fn delete_range(table_id: u32, col_id: u8, range: Range<AlgebraicValue>) -> Result<u32> {
225+
with_row_buf(|bytes| {
226+
range.start.encode(bytes);
227+
let mid = bytes.len();
228+
range.end.encode(bytes);
229+
let (range_start, range_end) = bytes.split_at(mid);
230+
sys::delete_range(table_id, col_id.into(), range_start, range_end)
231+
})
232+
}
233+
*/
203234

204-
// Get the iterator for this table with an optional filter,
205-
fn table_iter<T: TableType>(table_id: TableId, filter: Option<spacetimedb_lib::filter::Expr>) -> Result<TableIter<T>> {
235+
//
236+
// fn page_table(table_id : u32, pager_token : u32, read_entries : u32) {
237+
//
238+
// }
239+
240+
// Get the buffer iterator for this table,
241+
// with an optional filter,
242+
// and return it and its decoded `ProductType` schema.
243+
fn buffer_table_iter(
244+
table_id: u32,
245+
filter: Option<spacetimedb_lib::filter::Expr>,
246+
) -> Result<(BufferIter, ProductType)> {
206247
// Decode the filter, if any.
207248
let filter = filter
208249
.as_ref()
@@ -211,8 +252,35 @@ fn table_iter<T: TableType>(table_id: TableId, filter: Option<spacetimedb_lib::f
211252
.expect("Couldn't decode the filter query");
212253

213254
// Create the iterator.
214-
let iter = sys::iter(table_id, filter.as_deref())?;
255+
let mut iter = sys::iter(table_id, filter.as_deref())?;
256+
257+
// First item is an encoded schema.
258+
let schema_raw = iter
259+
.next()
260+
.expect("Missing schema")
261+
.expect("Failed to get schema")
262+
.read();
263+
let schema = decode_schema(&mut &schema_raw[..]).expect("Could not decode schema");
264+
265+
Ok((iter, schema))
266+
}
267+
268+
/// A table iterator which yields `ProductValue`s.
269+
// type ProductValueTableIter = RawTableIter<ProductValue, ProductValueBufferDeserialize>;
270+
271+
// fn pv_table_iter(table_id: u32, filter: Option<spacetimedb_lib::filter::Expr>) -> Result<ProductValueTableIter> {
272+
// let (iter, schema) = buffer_table_iter(table_id, filter)?;
273+
// let deserializer = ProductValueBufferDeserialize::new(schema);
274+
// Ok(RawTableIter::new(iter, deserializer))
275+
// }
276+
277+
/// A table iterator which yields values of the `TableType` corresponding to the table.
278+
type TableTypeTableIter<T> = RawTableIter<TableTypeBufferDeserialize<T>>;
215279

280+
fn table_iter<T: TableType>(table_id: u32, filter: Option<spacetimedb_lib::filter::Expr>) -> Result<TableIter<T>> {
281+
// The TableType deserializer doesn't need the schema, as we have type-directed
282+
// dispatch to deserialize any given `TableType`.
283+
let (iter, _schema) = buffer_table_iter(table_id, filter)?;
216284
let deserializer = TableTypeBufferDeserialize::new();
217285
Ok(RawTableIter::new(iter, deserializer).into())
218286
}

crates/core/src/host/instance_env.rs

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,9 @@ impl BufWriter for ChunkedWriter {
5050
}
5151

5252
impl ChunkedWriter {
53-
/// Flushes the data collected in the scratch space if it's larger than our
54-
/// chunking threshold.
55-
pub fn flush(&mut self) {
56-
// For now, just send buffers over a certain fixed size.
57-
const ITER_CHUNK_SIZE: usize = 64 * 1024;
58-
59-
if self.scratch_space.len() > ITER_CHUNK_SIZE {
53+
/// Flushes the currently populated part of the scratch space as a new chunk.
54+
pub fn force_flush(&mut self) {
55+
if !self.scratch_space.is_empty() {
6056
// We intentionally clone here so that our scratch space is not
6157
// recreated with zero capacity (via `Vec::new`), but instead can
6258
// be `.clear()`ed in-place and reused.
@@ -70,11 +66,22 @@ impl ChunkedWriter {
7066
}
7167
}
7268

69+
/// Similar to [`Self::force_flush`], but only flushes if the data in the
70+
/// scratch space is larger than our chunking threshold.
71+
pub fn flush(&mut self) {
72+
// For now, just send buffers over a certain fixed size.
73+
const ITER_CHUNK_SIZE: usize = 64 * 1024;
74+
75+
if self.scratch_space.len() > ITER_CHUNK_SIZE {
76+
self.force_flush();
77+
}
78+
}
79+
7380
/// Finalises the writer and returns all the chunks.
7481
pub fn into_chunks(mut self) -> Vec<Box<[u8]>> {
7582
if !self.scratch_space.is_empty() {
76-
// Avoid extra clone by just shrinking and pushing the scratch space
77-
// in-place.
83+
// This is equivalent to calling `force_flush`, but we avoid extra
84+
// clone by just shrinking and pushing the scratch space in-place.
7885
self.chunks.push(self.scratch_space.into());
7986
}
8087
self.chunks
@@ -274,6 +281,10 @@ impl InstanceEnv {
274281
let stdb = &*self.dbic.relational_db;
275282
let tx = &mut *self.tx.get()?;
276283

284+
stdb.row_schema_for_table(tx, table_id)?.encode(&mut chunked_writer);
285+
// initial chunk is expected to be schema itself, so force-flush it as a separate chunk
286+
chunked_writer.force_flush();
287+
277288
for row in stdb.iter(ctx, tx, table_id)? {
278289
row.view().encode(&mut chunked_writer);
279290
// Flush at row boundaries.
@@ -319,12 +330,18 @@ impl InstanceEnv {
319330
}
320331
}
321332

333+
let mut chunked_writer = ChunkedWriter::default();
334+
322335
let stdb = &self.dbic.relational_db;
323336
let tx = &mut *self.tx.get()?;
324337

325338
let schema = stdb.schema_for_table(tx, table_id)?;
326339
let row_type = ProductType::from(&*schema);
327340

341+
// write and force flush schema as it's expected to be the first individual chunk
342+
row_type.encode(&mut chunked_writer);
343+
chunked_writer.force_flush();
344+
328345
let filter = filter::Expr::from_bytes(
329346
// TODO: looks like module typespace is currently not hooked up to instances;
330347
// use empty typespace for now which should be enough for primitives
@@ -342,8 +359,6 @@ impl InstanceEnv {
342359
_ => unreachable!("query should always return a table"),
343360
};
344361

345-
let mut chunked_writer = ChunkedWriter::default();
346-
347362
// write all rows and flush at row boundaries
348363
for row in results.data {
349364
row.data.encode(&mut chunked_writer);

crates/core/src/host/wasmer/wasmer_module.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,13 @@ impl WasmerModule {
4545
WasmerModule { module, engine }
4646
}
4747

48-
pub const IMPLEMENTED_ABI: abi::VersionTuple = abi::VersionTuple::new(7, 0);
48+
pub const IMPLEMENTED_ABI: abi::VersionTuple = abi::VersionTuple::new(6, 0);
4949

5050
fn imports(&self, store: &mut Store, env: &FunctionEnv<WasmInstanceEnv>) -> Imports {
5151
#[allow(clippy::assertions_on_constants)]
5252
const _: () = assert!(WasmerModule::IMPLEMENTED_ABI.major == spacetimedb_lib::MODULE_ABI_MAJOR_VERSION);
5353
imports! {
54-
"spacetime_7.0" => {
54+
"spacetime_6.0" => {
5555
"_schedule_reducer" => Function::new_typed_with_env(store, env, WasmInstanceEnv::schedule_reducer),
5656
"_cancel_reducer" => Function::new_typed_with_env(store, env, WasmInstanceEnv::cancel_reducer),
5757
"_delete_by_col_eq" => Function::new_typed_with_env(

crates/lib/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ pub use type_value::{AlgebraicValue, ProductValue};
4141

4242
pub use spacetimedb_sats as sats;
4343

44-
pub const MODULE_ABI_MAJOR_VERSION: u16 = 7;
44+
pub const MODULE_ABI_MAJOR_VERSION: u16 = 6;
4545

4646
// if it ends up we need more fields in the future, we can split one of them in two
4747
#[derive(PartialEq, Eq, PartialOrd, Ord, Copy, Clone, Debug)]

0 commit comments

Comments
 (0)