Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ rustc-hash = "1.1.0"
rustyline = { version = "12.0.0", features = [] }
scoped-tls = "1.0.1"
scopeguard = "1.1.0"
second-stack = "0.3"
sendgrid = "0.21"
serde = "1.0.136"
serde_json = { version = "1.0.87", features = ["raw_value"] }
Expand Down
8 changes: 7 additions & 1 deletion crates/bench/benches/special.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,13 @@ fn serialize_benchmarks<T: BenchTable + RandomTable>(c: &mut Criterion) {
let ptrs = data_pv
.elements
.iter()
.map(|row| table.insert(&mut blob_store, row.as_product().unwrap()).unwrap().1)
.map(|row| {
table
.insert(&mut blob_store, row.as_product().unwrap())
.unwrap()
.1
.pointer()
})
.collect::<Vec<_>>();
let refs = ptrs
.into_iter()
Expand Down
9 changes: 5 additions & 4 deletions crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -780,11 +780,12 @@ impl MutTxId {
.ok_or(TableError::IdNotFoundState(table_id))?;

match tx_table.insert(tx_blob_store, row) {
Ok((hash, ptr)) => {
Ok((hash, row_ref)) => {
// `row` not previously present in insert tables,
// but may still be a set-semantic conflict with a row
// in the committed state.

let ptr = row_ref.pointer();
if let Some(commit_table) = commit_table {
// Safety:
// - `commit_table` and `tx_table` use the same schema
Expand Down Expand Up @@ -912,9 +913,9 @@ impl MutTxId {
"Table::insert_internal_allow_duplicates returned error of unexpected variant: {:?}",
e
),
Ok(ptr) => {
// Safety: `ptr` must be valid, because we just inserted it and haven't deleted it since.
let hash = unsafe { tx_table.row_hash_for(ptr) };
Ok(row_ref) => {
let hash = row_ref.row_hash();
let ptr = row_ref.pointer();

// First, check if a matching row exists in the `tx_table`.
// If it does, no need to check the `commit_table`.
Expand Down
1 change: 1 addition & 0 deletions crates/sats/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ itertools.workspace = true
proptest = { workspace = true, optional = true }
proptest-derive = { workspace = true, optional = true }
sha3.workspace = true
second-stack.workspace = true
serde = { workspace = true, optional = true }
smallvec.workspace = true
thiserror.workspace = true
Expand Down
98 changes: 79 additions & 19 deletions crates/sats/src/algebraic_value/ser.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use second_stack::uninit_slice;

use crate::ser::{self, ForwardNamedToSeqProduct, Serialize};
use crate::{AlgebraicType, AlgebraicValue, ArrayValue, MapValue, F32, F64};
use core::convert::Infallible;
use core::mem::MaybeUninit;
use core::ptr;
use std::alloc::{self, Layout};

Expand Down Expand Up @@ -96,10 +99,12 @@ impl ser::Serializer for ValueSerializer {
chunks: I,
) -> Result<Self::Ok, Self::Error> {
// SAFETY: Caller promised `total_bsatn_len == chunks.map(|c| c.len()).sum() <= isize::MAX`.
let bsatn = unsafe { concat_byte_chunks(total_bsatn_len, chunks) };

// SAFETY: Caller promised `AlgebraicValue::decode(ty, &mut bytes).is_ok()`.
unsafe { self.serialize_bsatn(ty, &bsatn) }
unsafe {
concat_byte_chunks_buf(total_bsatn_len, chunks, |bsatn| {
// SAFETY: Caller promised `AlgebraicValue::decode(ty, &mut bytes).is_ok()`.
ValueSerializer.serialize_bsatn(ty, bsatn)
})
}
}

unsafe fn serialize_str_in_chunks<'a, I: Iterator<Item = &'a [u8]>>(
Expand Down Expand Up @@ -136,33 +141,88 @@ unsafe fn concat_byte_chunks<'a>(total_len: usize, chunks: impl Iterator<Item =
alloc::handle_alloc_error(layout);
}

// Copy over each `chunk`.
// SAFETY:
// 1. `ptr` is valid for writes as we own it
// caller promised that all `chunk`s will fit in `total_len`.
// 2. `ptr` points to a new allocation so it cannot overlap with any in `chunks`.
unsafe { write_byte_chunks(ptr, chunks) };

// Convert allocation to a `Vec<u8>`.
// SAFETY:
// - `ptr` was allocated using global allocator.
// - `u8` and `ptr`'s allocation both have alignment of 1.
// - `ptr`'s allocation is `total_len <= isize::MAX`.
// - `total_len <= total_len` holds.
// - `total_len` values were initialized at type `u8`
// as we know `total_len == chunks.map(|c| c.len()).sum()`.
unsafe { Vec::from_raw_parts(ptr, total_len, total_len) }
}

/// Returns the concatenation of `chunks` that must be of `total_len` as a `Vec<u8>`.
///
/// # Safety
///
/// - `total_len == chunks.map(|c| c.len()).sum() <= isize::MAX`
pub unsafe fn concat_byte_chunks_buf<'a, R>(
total_len: usize,
chunks: impl Iterator<Item = &'a [u8]>,
run: impl FnOnce(&[u8]) -> R,
) -> R {
uninit_slice(total_len, |buf: &mut [MaybeUninit<u8>]| {
let dst = buf.as_mut_ptr().cast();
debug_assert_eq!(total_len, buf.len());
// SAFETY:
// 1. `buf.len() == total_len`
// 2. `buf` cannot overlap with anything yielded by `var_iter`.
unsafe { write_byte_chunks(dst, chunks) }
// SAFETY: Every byte of `buf` was initialized in the previous call
// as we know that `total_len == var_iter.map(|c| c.len()).sum()`.
let bytes = unsafe { slice_assume_init_ref(buf) };
run(bytes)
})
}

/// Copies over each `chunk` in `chunks` to `dst`, writing `total_len` bytes to `dst`.
///
/// # Safety
///
/// Let `total_len == chunks.map(|c| c.len()).sum()`.
/// 1. `dst` must be valid for writes for `total_len` bytes.
/// 2. `dst..(dst + total_len)` does not overlap with any slice yielded by `chunks`.
unsafe fn write_byte_chunks<'a>(mut dst: *mut u8, chunks: impl Iterator<Item = &'a [u8]>) {
// Copy over each `chunk`, moving `dst` by `chunk.len()` time.
let mut dst = ptr;
for chunk in chunks {
let len = chunk.len();
// SAFETY:
// - `chunk` is valid for reads for `len` bytes.
// - `dst` is valid for writes as we own it
// and as (1) caller promised that all `chunk`s will fit in `total_len`,
// this entails that `dst..dst + len` is always in bounds of the allocation.
// - By line above, `chunk` is valid for reads for `len` bytes.
// - By (1) `dst` is valid for writes as promised by caller
// and that all `chunk`s will fit in `total_len`.
// This entails that `dst..dst + len` is always in bounds of the allocation.
// - `chunk` and `dst` are trivially properly aligned (`align_of::<u8>() == 1`).
// - The allocation `ptr` points to is new so derived pointers cannot overlap with `chunk`.
// - By (2) derived pointers of `dst` cannot overlap with `chunk`.
unsafe {
ptr::copy_nonoverlapping(chunk.as_ptr(), dst, len);
}
// SAFETY: Same as (1).
dst = unsafe { dst.add(len) };
}
}

// Convert allocation to a `Vec<u8>`.
// SAFETY:
// - `ptr` was allocated using global allocator.
// - `u8` and `ptr`'s allocation both have alignment of 1.
// - `ptr`'s allocation is `total_len <= isize::MAX`.
// - `total_len <= total_len` holds.
// - `total_len` values were initialized at type `u8`
// as we know `total_len == chunks.map(|c| c.len()).sum()`.
unsafe { Vec::from_raw_parts(ptr, total_len, total_len) }
/// Convert a `[MaybeUninit<T>]` into a `[T]` by asserting all elements are initialized.
///
/// Identitcal copy of the source of `MaybeUninit::slice_assume_init_ref`, but that's not stabilized.
/// https://doc.rust-lang.org/std/mem/union.MaybeUninit.html#method.slice_assume_init_ref
///
/// # Safety
///
/// All elements of `slice` must be initialized.
pub const unsafe fn slice_assume_init_ref<T>(slice: &[MaybeUninit<T>]) -> &[T] {
// SAFETY: casting `slice` to a `*const [T]` is safe since the caller guarantees that
// `slice` is initialized, and `MaybeUninit` is guaranteed to have the same layout as `T`.
// The pointer obtained is valid since it refers to memory owned by `slice` which is a
// reference and thus guaranteed to be valid for reads.
unsafe { &*(slice as *const [MaybeUninit<T>] as *const [T]) }
}

/// Continuation for serializing an array.
Expand Down
1 change: 1 addition & 0 deletions crates/table/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ proptest = { workspace = true, optional = true }
proptest-derive = { workspace = true, optional = true }

[dev-dependencies]
spacetimedb-sats = { workspace = true, features = ["proptest"] }
criterion.workspace = true
proptest.workspace = true
proptest-derive.workspace = true
Expand Down
10 changes: 9 additions & 1 deletion crates/table/benches/page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,15 @@ fn hash_in_page(c: &mut Criterion) {
group.bench_function(name, |b| {
let mut hasher = RowHash::hasher_builder().build_hasher();
b.iter(|| {
unsafe { hash_row_in_page(&mut hasher, black_box(&page), black_box(offset), black_box(&ty)) };
unsafe {
hash_row_in_page(
&mut hasher,
black_box(&page),
black_box(&NullBlobStore),
black_box(offset),
black_box(&ty),
)
};
black_box(&mut hasher);
});
});
Expand Down
45 changes: 22 additions & 23 deletions crates/table/benches/page_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,16 +524,19 @@ fn use_type_throughput<T>(group: &mut BenchmarkGroup<'_, impl Measurement>) {

fn table_insert_one_row(c: &mut Criterion) {
fn bench_insert_row<R: Row>(group: Group<'_, '_>, val: R, name: &str) {
let mut table = make_table_for_row_type::<R>(name);
let table = make_table_for_row_type::<R>(name);
let val = black_box(val.to_product());

// Insert before benching to alloc and fault in a page.
let ptr = table.insert(&mut NullBlobStore, &val).unwrap().1;
let pre = |_, table: &mut Table| {
table.delete(&mut NullBlobStore, ptr).unwrap();
let mut ctx = (table, NullBlobStore);
let ptr = ctx.0.insert(&mut ctx.1, &val).unwrap().1.pointer();
let pre = |_, (table, bs): &mut (Table, NullBlobStore)| {
table.delete(bs, ptr).unwrap();
};
group.bench_function(name, |b| {
iter_time_with(b, &mut table, pre, |_, _, table| table.insert(&mut NullBlobStore, &val));
iter_time_with(b, &mut ctx, pre, |_, _, (table, bs)| {
table.insert(bs, &val).map(|r| r.1.pointer())
});
});
}

Expand Down Expand Up @@ -571,16 +574,15 @@ fn table_insert_one_row(c: &mut Criterion) {

fn table_delete_one_row(c: &mut Criterion) {
fn bench_delete_row<R: Row>(group: Group<'_, '_>, val: R, name: &str) {
let mut table = make_table_for_row_type::<R>(name);
let table = make_table_for_row_type::<R>(name);
let val = val.to_product();

// Insert before benching to alloc and fault in a page.
let insert = |_, table: &mut Table| table.insert(&mut NullBlobStore, &val).unwrap().1;
let mut ctx = (table, NullBlobStore);
let insert = |_: u64, (table, bs): &mut (Table, NullBlobStore)| table.insert(bs, &val).unwrap().1.pointer();

group.bench_function(name, |b| {
iter_time_with(b, &mut table, insert, |ptr, _, table| {
table.delete(&mut NullBlobStore, ptr)
});
iter_time_with(b, &mut ctx, insert, |row, _, (table, bs)| table.delete(bs, row));
});
}

Expand Down Expand Up @@ -621,16 +623,10 @@ fn table_extract_one_row(c: &mut Criterion) {
let mut table = make_table_for_row_type::<R>(name);
let val = val.to_product();

let ptr = table.insert(&mut NullBlobStore, &val).unwrap().1;
let mut blob_store = NullBlobStore;
let row = black_box(table.insert(&mut blob_store, &val).unwrap().1);
group.bench_function(name, |b| {
b.iter_with_large_drop(|| {
black_box(
black_box(&table)
.get_row_ref(&NullBlobStore, black_box(ptr))
.unwrap()
.to_product_value(),
)
});
b.iter_with_large_drop(|| black_box(row.to_product_value()));
});
}

Expand Down Expand Up @@ -760,7 +756,7 @@ fn insert_num_same<R: IndexedRow>(
if let Some(slot) = row.elements.get_mut(1) {
*slot = n.into();
}
tbl.insert(&mut NullBlobStore, &row).map(|(_, ptr)| ptr).ok()
tbl.insert(&mut NullBlobStore, &row).map(|(_, row)| row.pointer()).ok()
})
.last()
.flatten()
Expand Down Expand Up @@ -815,18 +811,21 @@ fn index_insert(c: &mut Criterion) {
same_ratio: f64,
) {
let make_row_move = &mut make_row;
let (mut tbl, num_same, _) = make_table_with_same_ratio::<R>(make_row_move, num_rows, same_ratio);
let (tbl, num_same, _) = make_table_with_same_ratio::<R>(make_row_move, num_rows, same_ratio);
let mut ctx = (tbl, NullBlobStore);

group.bench_with_input(
bench_id_for_index(name, num_rows, same_ratio, num_same),
&num_rows,
|b, &num_rows| {
let pre = |_, tbl: &mut Table| {
let pre = |_, (tbl, _): &mut (Table, NullBlobStore)| {
clear_all_same::<R>(tbl, num_rows);
insert_num_same(tbl, || make_row(num_rows), num_same - 1);
make_row(num_rows).to_product()
};
iter_time_with(b, &mut tbl, pre, |row, _, tbl| tbl.insert(&mut NullBlobStore, &row));
iter_time_with(b, &mut ctx, pre, |row, _, (tbl, bs)| {
tbl.insert(bs, &row).map(|r| r.1.pointer())
});
},
);
}
Expand Down
8 changes: 8 additions & 0 deletions crates/table/proptest-regressions/row_hash.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Seeds for failure cases proptest has generated in the past. It is
# automatically read and these particular cases re-run before any
# novel cases are generated.
#
# It is recommended to check this file in to source control so that
# everyone who runs the test benefits from these saved cases.
cc 3e55b94365a0ae7698bb9e89259f3f5b84227b1c5ba2f0737be0360f55256c26 # shrinks to (ty, val) = (ProductType { elements: [ProductTypeElement { name: None, algebraic_type: Sum(SumType { variants: [SumTypeVariant { name: None, algebraic_type: Builtin(Bool) }] }) }] }, ProductValue { elements: [Sum(SumValue { tag: 0, value: Bool(false) })] })
cc aedcfc0fa45005cb11fa8b47f668a8b68c99adadfb50fdc6219840aa8ffd83f6 # shrinks to (ty, val) = (ProductType { elements: [ProductTypeElement { name: None, algebraic_type: Sum(SumType { variants: [SumTypeVariant { name: None, algebraic_type: Builtin(I32) }, SumTypeVariant { name: None, algebraic_type: Builtin(F64) }, SumTypeVariant { name: None, algebraic_type: Builtin(String) }, SumTypeVariant { name: None, algebraic_type: Builtin(U16) }, SumTypeVariant { name: None, algebraic_type: Builtin(U16) }, SumTypeVariant { name: None, algebraic_type: Builtin(String) }, SumTypeVariant { name: None, algebraic_type: Builtin(Bool) }, SumTypeVariant { name: None, algebraic_type: Builtin(I16) }, SumTypeVariant { name: None, algebraic_type: Builtin(I16) }, SumTypeVariant { name: None, algebraic_type: Builtin(I64) }, SumTypeVariant { name: None, algebraic_type: Builtin(I128) }, SumTypeVariant { name: None, algebraic_type: Builtin(U64) }] }) }] }, ProductValue { elements: [Sum(SumValue { tag: 2, value: String("יּ/Ⱥ🂠છrÔ") })] })
2 changes: 1 addition & 1 deletion crates/table/src/bflatn_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ unsafe fn serialize_bsatn<S: Serializer>(
let vlr = unsafe { read_from_bytes::<VarLenRef>(bytes, curr_offset) };

if vlr.is_large_blob() {
// SAFETY: As `vlr` a blob, `vlr.first_granule` always points to a valid granule.
// SAFETY: As `vlr` is a blob, `vlr.first_granule` always points to a valid granule.
let blob = unsafe { vlr_blob_bytes(page, blob_store, vlr) };
// SAFETY: The BSATN in `blob` is encoded from an `AlgebraicValue`.
unsafe { ser.serialize_bsatn(ty, blob) }
Expand Down
9 changes: 5 additions & 4 deletions crates/table/src/bflatn_to_bsatn_fast_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ use crate::{
AlgebraicTypeLayout, HasLayout, PrimitiveType, ProductTypeElementLayout, ProductTypeLayout, RowTypeLayout,
SumTypeLayout, SumTypeVariantLayout,
},
util::{range_move, slice_assume_init_ref},
util::range_move,
};
use spacetimedb_sats::algebraic_value::ser::slice_assume_init_ref;

/// A precomputed BSATN layout for a type whose encoded length is a known constant,
/// enabling fast BFLATN -> BSATN conversion.
Expand Down Expand Up @@ -482,13 +483,13 @@ mod test {
return Err(TestCaseError::reject("Var-length type"));
};

let (_, ptr) = table.insert(&mut blob_store, &val).unwrap();
let size = table.row_layout().size();
let (_, row_ref) = table.insert(&mut blob_store, &val).unwrap();

let row_ref = table.get_row_ref(&blob_store, ptr).unwrap();
let slow_path = bsatn::to_vec(&row_ref).unwrap();

let (page, offset) = row_ref.page_and_offset();
let bytes = page.get_row_data(offset, table.row_layout().size());
let bytes = page.get_row_data(offset, size);

let mut fast_path = vec![0u8; bsatn_layout.bsatn_length as usize];
unsafe {
Expand Down
Loading