Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 3 additions & 87 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ fs2 = "0.4.3"
fs-err = "2.9.0"
futures = "0.3"
futures-channel = "0.3"
genawaiter = "0.99.1"
getrandom = { version = "0.2.7", features = ["custom"] }
glob = "0.3.1"
hex = "0.4.3"
Expand Down
1 change: 0 additions & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ email_address.workspace = true
flate2.workspace = true
fs2.workspace = true
futures.workspace = true
genawaiter.workspace = true
hex.workspace = true
hostname.workspace = true
hyper.workspace = true
Expand Down
124 changes: 77 additions & 47 deletions crates/core/src/host/instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use spacetimedb_lib::filter::CmpArgs;
use spacetimedb_lib::identity::AuthCtx;
use spacetimedb_lib::operator::OpQuery;
use spacetimedb_lib::relation::{FieldExpr, FieldName};
use spacetimedb_sats::buffer::BufWriter;
use spacetimedb_sats::{ProductType, Typespace};
use spacetimedb_vm::expr::{Code, ColumnOp};

Expand All @@ -33,6 +34,53 @@ pub struct TxSlot {
inner: Arc<Mutex<Option<MutTxId>>>,
}

#[derive(Default)]
struct ChunkedWriter {
chunks: Vec<Box<[u8]>>,
scratch_space: Vec<u8>,
}

impl BufWriter for ChunkedWriter {
fn put_slice(&mut self, slice: &[u8]) {
self.scratch_space.extend_from_slice(slice);
}
}

impl ChunkedWriter {
pub fn force_flush(&mut self) {
if !self.scratch_space.is_empty() {
// We intentionally clone here so that our scratch space is not
// recreated with zero capacity (via `Vec::new`), but instead can
// be `.clear()`ed in-place and reused.
//
// This way the buffers in `chunks` are always fitted fixed-size to
// the actual data they contain, while the scratch space is ever-
// growing and has higher chance of fitting each next row without
// reallocation.
self.chunks.push(self.scratch_space.as_slice().into());
self.scratch_space.clear();
}
}

pub fn flush(&mut self) {
// For now, just send buffers over a certain fixed size.
const ITER_CHUNK_SIZE: usize = 64 * 1024;

if self.scratch_space.len() > ITER_CHUNK_SIZE {
self.force_flush();
}
}

pub fn into_chunks(mut self) -> Vec<Box<[u8]>> {
if !self.scratch_space.is_empty() {
// This is equivalent to calling `force_flush`, but we avoid extra
// clone by just shrinking and pushing the scratch space in-place.
self.chunks.push(self.scratch_space.into());
}
self.chunks
}
}

// Generic 'instance environment' delegated to from various host types.
impl InstanceEnv {
pub fn new(dbic: Arc<DatabaseInstanceContext>, scheduler: Scheduler) -> Self {
Expand Down Expand Up @@ -322,57 +370,27 @@ impl InstanceEnv {
}

#[tracing::instrument(skip_all)]
pub fn iter(&self, table_id: u32) -> impl Iterator<Item = Result<Vec<u8>, NodesError>> {
use genawaiter::{sync::gen, yield_, GeneratorState};

// Cheap Arc clones to untie the returned iterator from our own lifetime.
let relational_db = self.dbic.relational_db.clone();
let tx = self.tx.clone();
pub fn iter_chunks(&self, table_id: u32) -> Result<Vec<Box<[u8]>>, NodesError> {
let mut chunked_writer = ChunkedWriter::default();

// For now, just send buffers over a certain fixed size.
fn should_yield_buf(buf: &Vec<u8>) -> bool {
const SIZE: usize = 64 * 1024;
buf.len() >= SIZE
}

let mut generator = Some(gen!({
let stdb = &*relational_db;
let tx = &mut *tx.get()?;
let stdb = &*self.dbic.relational_db;
let tx = &mut *self.tx.get()?;

let mut buf = Vec::new();
let schema = stdb.row_schema_for_table(tx, table_id)?;
schema.encode(&mut buf);
yield_!(buf);
stdb.row_schema_for_table(tx, table_id)?.encode(&mut chunked_writer);
// initial chunk is expected to be schema itself, so force-flush it as a separate chunk
chunked_writer.force_flush();

let mut buf = Vec::new();
for row in stdb.iter(tx, table_id)? {
if should_yield_buf(&buf) {
yield_!(buf);
buf = Vec::new();
}
row.view().encode(&mut buf);
}
if !buf.is_empty() {
yield_!(buf)
}
for row in stdb.iter(tx, table_id)? {
row.view().encode(&mut chunked_writer);
// Flush at row boundaries.
chunked_writer.flush();
}

Ok(())
}));

std::iter::from_fn(move || match generator.as_mut()?.resume() {
GeneratorState::Yielded(bytes) => Some(Ok(bytes)),
GeneratorState::Complete(res) => {
generator = None;
match res {
Ok(()) => None,
Err(err) => Some(Err(err)),
}
}
})
Ok(chunked_writer.into_chunks())
}

#[tracing::instrument(skip_all)]
pub fn iter_filtered(&self, table_id: u32, filter: &[u8]) -> Result<impl Iterator<Item = Vec<u8>>, NodesError> {
pub fn iter_filtered_chunks(&self, table_id: u32, filter: &[u8]) -> Result<Vec<Box<[u8]>>, NodesError> {
use spacetimedb_lib::filter;

fn filter_to_column_op(table_name: &str, filter: filter::Expr) -> ColumnOp {
Expand Down Expand Up @@ -402,11 +420,18 @@ impl InstanceEnv {
}
}

let mut chunked_writer = ChunkedWriter::default();

let stdb = &self.dbic.relational_db;
let tx = &mut *self.tx.get()?;

let schema = stdb.schema_for_table(tx, table_id)?;
let row_type = ProductType::from(&*schema);

// write and force flush schema as it's expected to be the first individual chunk
row_type.encode(&mut chunked_writer);
chunked_writer.force_flush();

let filter = filter::Expr::from_bytes(
// TODO: looks like module typespace is currently not hooked up to instances;
// use empty typespace for now which should be enough for primitives
Expand All @@ -423,9 +448,14 @@ impl InstanceEnv {
Code::Table(table) => table,
_ => unreachable!("query should always return a table"),
};
Ok(std::iter::once(bsatn::to_vec(&row_type))
.chain(results.data.into_iter().map(|row| bsatn::to_vec(&row.data)))
.map(|bytes| bytes.expect("encoding algebraic values should never fail")))

// write all rows and flush at row boundaries
for row in results.data {
row.data.encode(&mut chunked_writer);
chunked_writer.flush();
}

Ok(chunked_writer.into_chunks())
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/host/wasm_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ impl BufferIdx {
}
}

decl_index!(BufferIterIdx => Box<dyn Iterator<Item = Result<bytes::Bytes, NodesError>> + Send + Sync>);
decl_index!(BufferIterIdx => std::vec::IntoIter<Box<[u8]>>);
pub(super) type BufferIters = ResourceSlab<BufferIterIdx>;

pub(super) struct TimingSpan {
Expand Down
Loading