diff --git a/Cargo.lock b/Cargo.lock index 127a0a5a6eb..e52cc8445bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1682,36 +1682,6 @@ dependencies = [ "byteorder", ] -[[package]] -name = "genawaiter" -version = "0.99.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c86bd0361bcbde39b13475e6e36cb24c329964aa2611be285289d1e4b751c1a0" -dependencies = [ - "genawaiter-macro", - "genawaiter-proc-macro", - "proc-macro-hack", -] - -[[package]] -name = "genawaiter-macro" -version = "0.99.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b32dfe1fdfc0bbde1f22a5da25355514b5e450c33a6af6770884c8750aedfbc" - -[[package]] -name = "genawaiter-proc-macro" -version = "0.99.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "784f84eebc366e15251c4a8c3acee82a6a6f427949776ecb88377362a9621738" -dependencies = [ - "proc-macro-error 0.4.12", - "proc-macro-hack", - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "generator" version = "0.7.5" @@ -2699,9 +2669,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-src" -version = "111.26.0+1.1.1u" +version = "111.28.0+1.1.1w" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efc62c9f12b22b8f5208c23a7200a442b2e5999f8bdf80233852122b5a4f6f37" +checksum = "3ce95ee1f6f999dfb95b8afd43ebe442758ea2104d1ccb99a94c30db22ae701f" dependencies = [ "cc", ] @@ -3038,42 +3008,16 @@ dependencies = [ "toml", ] -[[package]] -name = "proc-macro-error" -version = "0.4.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18f33027081eba0a6d8aba6d1b1c3a3be58cbb12106341c2d5759fcd9b5277e7" -dependencies = [ - "proc-macro-error-attr 0.4.12", - "proc-macro2", - "quote", - "syn 1.0.109", - "version_check", -] - [[package]] name = "proc-macro-error" version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" dependencies = [ - "proc-macro-error-attr 1.0.4", - "proc-macro2", - "quote", - "syn 1.0.109", - "version_check", -] - -[[package]] -name = "proc-macro-error-attr" -version = "0.4.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a5b4b77fdb63c1eca72173d68d24501c54ab1269409f6b672c85deb18af69de" -dependencies = [ + "proc-macro-error-attr", "proc-macro2", "quote", "syn 1.0.109", - "syn-mid", "version_check", ] @@ -3088,12 +3032,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "proc-macro-hack" -version = "0.5.20+deprecated" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" - [[package]] name = "proc-macro2" version = "1.0.63" @@ -4329,7 +4267,6 @@ dependencies = [ "flate2", "fs2", "futures", - "genawaiter", "hex", "hostname", "hyper", @@ -4700,17 +4637,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "syn-mid" -version = "0.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fea305d57546cc8cd04feb14b62ec84bf17f50e3f7b12560d7bfa9265f39d9ed" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "sync_wrapper" version = "0.1.2" @@ -4770,7 +4696,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f9ee618502f497abf593e1c5c9577f34775b111480009ffccd7ad70d23fcaba8" dependencies = [ "heck", - "proc-macro-error 1.0.4", + "proc-macro-error", "proc-macro2", "quote", "syn 1.0.109", @@ -5665,7 +5591,7 @@ version = "4.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25bb1425c9e4dc3e2d3aacd6e82e22e27a3127379e0d09bcbdf25ff376229162" dependencies = [ - "proc-macro-error 1.0.4", + "proc-macro-error", "proc-macro2", "quote", "syn 1.0.109", diff --git a/Cargo.toml b/Cargo.toml index bdeea7e7ff0..4b739603f0f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -97,7 +97,6 @@ fs2 = "0.4.3" fs-err = "2.9.0" futures = "0.3" futures-channel = "0.3" -genawaiter = "0.99.1" getrandom = { version = "0.2.7", features = ["custom"] } glob = "0.3.1" hex = "0.4.3" diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 838eccd5ecf..7abf70c654f 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -37,7 +37,6 @@ email_address.workspace = true flate2.workspace = true fs2.workspace = true futures.workspace = true -genawaiter.workspace = true hex.workspace = true hostname.workspace = true hyper.workspace = true diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index 7ad65addfa8..9d67936ca84 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -18,6 +18,7 @@ use spacetimedb_lib::filter::CmpArgs; use spacetimedb_lib::identity::AuthCtx; use spacetimedb_lib::operator::OpQuery; use spacetimedb_lib::relation::{FieldExpr, FieldName}; +use spacetimedb_sats::buffer::BufWriter; use spacetimedb_sats::{ProductType, Typespace}; use spacetimedb_vm::expr::{Code, ColumnOp}; @@ -33,6 +34,57 @@ pub struct TxSlot { inner: Arc>>, } +#[derive(Default)] +struct ChunkedWriter { + chunks: Vec>, + scratch_space: Vec, +} + +impl BufWriter for ChunkedWriter { + fn put_slice(&mut self, slice: &[u8]) { + self.scratch_space.extend_from_slice(slice); + } +} + +impl ChunkedWriter { + /// Flushes the currently populated part of the scratch space as a new chunk. + pub fn force_flush(&mut self) { + if !self.scratch_space.is_empty() { + // We intentionally clone here so that our scratch space is not + // recreated with zero capacity (via `Vec::new`), but instead can + // be `.clear()`ed in-place and reused. + // + // This way the buffers in `chunks` are always fitted fixed-size to + // the actual data they contain, while the scratch space is ever- + // growing and has higher chance of fitting each next row without + // reallocation. + self.chunks.push(self.scratch_space.as_slice().into()); + self.scratch_space.clear(); + } + } + + /// Similar to [`Self::force_flush`], but only flushes if the data in the + /// scratch space is larger than our chunking threshold. + pub fn flush(&mut self) { + // For now, just send buffers over a certain fixed size. + const ITER_CHUNK_SIZE: usize = 64 * 1024; + + if self.scratch_space.len() > ITER_CHUNK_SIZE { + self.force_flush(); + } + } + + /// Finalises the writer and returns all the chunks. + pub fn into_chunks(mut self) -> Vec> { + if !self.scratch_space.is_empty() { + // This is equivalent to calling `force_flush`, but we avoid extra + // clone by just shrinking and pushing the scratch space in-place. + self.chunks.push(self.scratch_space.into()); + } + self.chunks + } +} + // Generic 'instance environment' delegated to from various host types. impl InstanceEnv { pub fn new(dbic: Arc, scheduler: Scheduler) -> Self { @@ -322,57 +374,27 @@ impl InstanceEnv { } #[tracing::instrument(skip_all)] - pub fn iter(&self, table_id: u32) -> impl Iterator, NodesError>> { - use genawaiter::{sync::gen, yield_, GeneratorState}; - - // Cheap Arc clones to untie the returned iterator from our own lifetime. - let relational_db = self.dbic.relational_db.clone(); - let tx = self.tx.clone(); + pub fn iter_chunks(&self, table_id: u32) -> Result>, NodesError> { + let mut chunked_writer = ChunkedWriter::default(); - // For now, just send buffers over a certain fixed size. - fn should_yield_buf(buf: &Vec) -> bool { - const SIZE: usize = 64 * 1024; - buf.len() >= SIZE - } - - let mut generator = Some(gen!({ - let stdb = &*relational_db; - let tx = &mut *tx.get()?; + let stdb = &*self.dbic.relational_db; + let tx = &mut *self.tx.get()?; - let mut buf = Vec::new(); - let schema = stdb.row_schema_for_table(tx, table_id)?; - schema.encode(&mut buf); - yield_!(buf); + stdb.row_schema_for_table(tx, table_id)?.encode(&mut chunked_writer); + // initial chunk is expected to be schema itself, so force-flush it as a separate chunk + chunked_writer.force_flush(); - let mut buf = Vec::new(); - for row in stdb.iter(tx, table_id)? { - if should_yield_buf(&buf) { - yield_!(buf); - buf = Vec::new(); - } - row.view().encode(&mut buf); - } - if !buf.is_empty() { - yield_!(buf) - } + for row in stdb.iter(tx, table_id)? { + row.view().encode(&mut chunked_writer); + // Flush at row boundaries. + chunked_writer.flush(); + } - Ok(()) - })); - - std::iter::from_fn(move || match generator.as_mut()?.resume() { - GeneratorState::Yielded(bytes) => Some(Ok(bytes)), - GeneratorState::Complete(res) => { - generator = None; - match res { - Ok(()) => None, - Err(err) => Some(Err(err)), - } - } - }) + Ok(chunked_writer.into_chunks()) } #[tracing::instrument(skip_all)] - pub fn iter_filtered(&self, table_id: u32, filter: &[u8]) -> Result>, NodesError> { + pub fn iter_filtered_chunks(&self, table_id: u32, filter: &[u8]) -> Result>, NodesError> { use spacetimedb_lib::filter; fn filter_to_column_op(table_name: &str, filter: filter::Expr) -> ColumnOp { @@ -402,11 +424,18 @@ impl InstanceEnv { } } + let mut chunked_writer = ChunkedWriter::default(); + let stdb = &self.dbic.relational_db; let tx = &mut *self.tx.get()?; let schema = stdb.schema_for_table(tx, table_id)?; let row_type = ProductType::from(&*schema); + + // write and force flush schema as it's expected to be the first individual chunk + row_type.encode(&mut chunked_writer); + chunked_writer.force_flush(); + let filter = filter::Expr::from_bytes( // TODO: looks like module typespace is currently not hooked up to instances; // use empty typespace for now which should be enough for primitives @@ -423,9 +452,14 @@ impl InstanceEnv { Code::Table(table) => table, _ => unreachable!("query should always return a table"), }; - Ok(std::iter::once(bsatn::to_vec(&row_type)) - .chain(results.data.into_iter().map(|row| bsatn::to_vec(&row.data))) - .map(|bytes| bytes.expect("encoding algebraic values should never fail"))) + + // write all rows and flush at row boundaries + for row in results.data { + row.data.encode(&mut chunked_writer); + chunked_writer.flush(); + } + + Ok(chunked_writer.into_chunks()) } } diff --git a/crates/core/src/host/wasm_common.rs b/crates/core/src/host/wasm_common.rs index cec5a5e7bef..0069f010ca6 100644 --- a/crates/core/src/host/wasm_common.rs +++ b/crates/core/src/host/wasm_common.rs @@ -281,7 +281,7 @@ impl BufferIdx { } } -decl_index!(BufferIterIdx => Box> + Send + Sync>); +decl_index!(BufferIterIdx => std::vec::IntoIter>); pub(super) type BufferIters = ResourceSlab; pub(super) struct TimingSpan { diff --git a/crates/core/src/host/wasmer/wasm_instance_env.rs b/crates/core/src/host/wasmer/wasm_instance_env.rs index 93ba9dcf203..b6d3ac794e1 100644 --- a/crates/core/src/host/wasmer/wasm_instance_env.rs +++ b/crates/core/src/host/wasmer/wasm_instance_env.rs @@ -7,8 +7,6 @@ use crate::host::wasm_common::{ err_to_errno, AbiRuntimeError, BufferIdx, BufferIterIdx, BufferIters, Buffers, TimingSpan, TimingSpanIdx, TimingSpanSet, }; -use bytes::Bytes; -use itertools::Itertools; use wasmer::{FunctionEnvMut, MemoryAccessError, RuntimeError, ValueType, WasmPtr}; use crate::host::instance_env::InstanceEnv; @@ -554,14 +552,12 @@ impl WasmInstanceEnv { // #[tracing::instrument(skip_all)] pub fn iter_start(caller: FunctionEnvMut<'_, Self>, table_id: u32, out: WasmPtr) -> RtResult { Self::cvt_ret(caller, "iter_start", out, |mut caller, _mem| { - // Construct the iterator. - let iter = caller.data().instance_env.iter(table_id); - // TODO: make it so the above iterator doesn't lock the database for its whole lifetime - let iter = iter.map_ok(Bytes::from).collect::>().into_iter(); + // Collect the iterator chunks. + let chunks = caller.data().instance_env.iter_chunks(table_id)?; // Register the iterator and get back the index to write to `out`. // Calls to the iterator are done through dynamic dispatch. - Ok(caller.data_mut().iters.insert(Box::new(iter))) + Ok(caller.data_mut().iters.insert(chunks.into_iter())) }) } @@ -591,13 +587,11 @@ impl WasmInstanceEnv { let filter = caller.data().mem().read_bytes(&caller, filter, filter_len)?; // Construct the iterator. - let iter = caller.data().instance_env.iter_filtered(table_id, &filter)?; - // TODO: make it so the above iterator doesn't lock the database for its whole lifetime - let iter = iter.map(Bytes::from).map(Ok).collect::>().into_iter(); + let chunks = caller.data().instance_env.iter_filtered_chunks(table_id, &filter)?; // Register the iterator and get back the index to write to `out`. // Calls to the iterator are done through dynamic dispatch. - Ok(caller.data_mut().iters.insert(Box::new(iter))) + Ok(caller.data_mut().iters.insert(chunks.into_iter())) }) } @@ -624,11 +618,9 @@ impl WasmInstanceEnv { .ok_or_else(|| RuntimeError::new("no such iterator"))?; // Advance the iterator. - match iter.next() { - Some(Ok(buf)) => Ok(data_mut.buffers.insert(buf)), - Some(Err(err)) => Err(err.into()), - None => Ok(BufferIdx::INVALID), - } + Ok(iter + .next() + .map_or(BufferIdx::INVALID, |buf| data_mut.buffers.insert(buf.into()))) }) }