Skip to content

Commit fd44242

Browse files
authored
1. Add Hash for RowRef + make it consistent with PV. (#1163)
2. Make `RowRef::row_hash` use the above. 3. Make `Table::insert` return a `RowRef`. 4. Use less unsafe because of 1-3. 5. Use `second-stack` to reuse temporary allocations in hashing and serialization.
1 parent 5cc05b1 commit fd44242

File tree

20 files changed

+321
-176
lines changed

20 files changed

+321
-176
lines changed

Cargo.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ rustc-hash = "1.1.0"
187187
rustyline = { version = "12.0.0", features = [] }
188188
scoped-tls = "1.0.1"
189189
scopeguard = "1.1.0"
190+
second-stack = "0.3"
190191
sendgrid = "0.21"
191192
serde = "1.0.136"
192193
serde_json = { version = "1.0.87", features = ["raw_value"] }

crates/bench/benches/special.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,13 @@ fn serialize_benchmarks<T: BenchTable + RandomTable>(c: &mut Criterion) {
9797
let ptrs = data_pv
9898
.elements
9999
.iter()
100-
.map(|row| table.insert(&mut blob_store, row.as_product().unwrap()).unwrap().1)
100+
.map(|row| {
101+
table
102+
.insert(&mut blob_store, row.as_product().unwrap())
103+
.unwrap()
104+
.1
105+
.pointer()
106+
})
101107
.collect::<Vec<_>>();
102108
let refs = ptrs
103109
.into_iter()

crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -780,11 +780,12 @@ impl MutTxId {
780780
.ok_or(TableError::IdNotFoundState(table_id))?;
781781

782782
match tx_table.insert(tx_blob_store, row) {
783-
Ok((hash, ptr)) => {
783+
Ok((hash, row_ref)) => {
784784
// `row` not previously present in insert tables,
785785
// but may still be a set-semantic conflict with a row
786786
// in the committed state.
787787

788+
let ptr = row_ref.pointer();
788789
if let Some(commit_table) = commit_table {
789790
// Safety:
790791
// - `commit_table` and `tx_table` use the same schema
@@ -912,9 +913,9 @@ impl MutTxId {
912913
"Table::insert_internal_allow_duplicates returned error of unexpected variant: {:?}",
913914
e
914915
),
915-
Ok(ptr) => {
916-
// Safety: `ptr` must be valid, because we just inserted it and haven't deleted it since.
917-
let hash = unsafe { tx_table.row_hash_for(ptr) };
916+
Ok(row_ref) => {
917+
let hash = row_ref.row_hash();
918+
let ptr = row_ref.pointer();
918919

919920
// First, check if a matching row exists in the `tx_table`.
920921
// If it does, no need to check the `commit_table`.

crates/sats/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ itertools.workspace = true
2929
proptest = { workspace = true, optional = true }
3030
proptest-derive = { workspace = true, optional = true }
3131
sha3.workspace = true
32+
second-stack.workspace = true
3233
serde = { workspace = true, optional = true }
3334
smallvec.workspace = true
3435
thiserror.workspace = true

crates/sats/src/algebraic_value/ser.rs

Lines changed: 79 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1+
use second_stack::uninit_slice;
2+
13
use crate::ser::{self, ForwardNamedToSeqProduct, Serialize};
24
use crate::{AlgebraicType, AlgebraicValue, ArrayValue, MapValue, F32, F64};
35
use core::convert::Infallible;
6+
use core::mem::MaybeUninit;
47
use core::ptr;
58
use std::alloc::{self, Layout};
69

@@ -96,10 +99,12 @@ impl ser::Serializer for ValueSerializer {
9699
chunks: I,
97100
) -> Result<Self::Ok, Self::Error> {
98101
// SAFETY: Caller promised `total_bsatn_len == chunks.map(|c| c.len()).sum() <= isize::MAX`.
99-
let bsatn = unsafe { concat_byte_chunks(total_bsatn_len, chunks) };
100-
101-
// SAFETY: Caller promised `AlgebraicValue::decode(ty, &mut bytes).is_ok()`.
102-
unsafe { self.serialize_bsatn(ty, &bsatn) }
102+
unsafe {
103+
concat_byte_chunks_buf(total_bsatn_len, chunks, |bsatn| {
104+
// SAFETY: Caller promised `AlgebraicValue::decode(ty, &mut bytes).is_ok()`.
105+
ValueSerializer.serialize_bsatn(ty, bsatn)
106+
})
107+
}
103108
}
104109

105110
unsafe fn serialize_str_in_chunks<'a, I: Iterator<Item = &'a [u8]>>(
@@ -136,33 +141,88 @@ unsafe fn concat_byte_chunks<'a>(total_len: usize, chunks: impl Iterator<Item =
136141
alloc::handle_alloc_error(layout);
137142
}
138143

144+
// Copy over each `chunk`.
145+
// SAFETY:
146+
// 1. `ptr` is valid for writes as we own it
147+
// caller promised that all `chunk`s will fit in `total_len`.
148+
// 2. `ptr` points to a new allocation so it cannot overlap with any in `chunks`.
149+
unsafe { write_byte_chunks(ptr, chunks) };
150+
151+
// Convert allocation to a `Vec<u8>`.
152+
// SAFETY:
153+
// - `ptr` was allocated using global allocator.
154+
// - `u8` and `ptr`'s allocation both have alignment of 1.
155+
// - `ptr`'s allocation is `total_len <= isize::MAX`.
156+
// - `total_len <= total_len` holds.
157+
// - `total_len` values were initialized at type `u8`
158+
// as we know `total_len == chunks.map(|c| c.len()).sum()`.
159+
unsafe { Vec::from_raw_parts(ptr, total_len, total_len) }
160+
}
161+
162+
/// Returns the concatenation of `chunks` that must be of `total_len` as a `Vec<u8>`.
163+
///
164+
/// # Safety
165+
///
166+
/// - `total_len == chunks.map(|c| c.len()).sum() <= isize::MAX`
167+
pub unsafe fn concat_byte_chunks_buf<'a, R>(
168+
total_len: usize,
169+
chunks: impl Iterator<Item = &'a [u8]>,
170+
run: impl FnOnce(&[u8]) -> R,
171+
) -> R {
172+
uninit_slice(total_len, |buf: &mut [MaybeUninit<u8>]| {
173+
let dst = buf.as_mut_ptr().cast();
174+
debug_assert_eq!(total_len, buf.len());
175+
// SAFETY:
176+
// 1. `buf.len() == total_len`
177+
// 2. `buf` cannot overlap with anything yielded by `var_iter`.
178+
unsafe { write_byte_chunks(dst, chunks) }
179+
// SAFETY: Every byte of `buf` was initialized in the previous call
180+
// as we know that `total_len == var_iter.map(|c| c.len()).sum()`.
181+
let bytes = unsafe { slice_assume_init_ref(buf) };
182+
run(bytes)
183+
})
184+
}
185+
186+
/// Copies over each `chunk` in `chunks` to `dst`, writing `total_len` bytes to `dst`.
187+
///
188+
/// # Safety
189+
///
190+
/// Let `total_len == chunks.map(|c| c.len()).sum()`.
191+
/// 1. `dst` must be valid for writes for `total_len` bytes.
192+
/// 2. `dst..(dst + total_len)` does not overlap with any slice yielded by `chunks`.
193+
unsafe fn write_byte_chunks<'a>(mut dst: *mut u8, chunks: impl Iterator<Item = &'a [u8]>) {
139194
// Copy over each `chunk`, moving `dst` by `chunk.len()` time.
140-
let mut dst = ptr;
141195
for chunk in chunks {
142196
let len = chunk.len();
143197
// SAFETY:
144-
// - `chunk` is valid for reads for `len` bytes.
145-
// - `dst` is valid for writes as we own it
146-
// and as (1) caller promised that all `chunk`s will fit in `total_len`,
147-
// this entails that `dst..dst + len` is always in bounds of the allocation.
198+
// - By line above, `chunk` is valid for reads for `len` bytes.
199+
// - By (1) `dst` is valid for writes as promised by caller
200+
// and that all `chunk`s will fit in `total_len`.
201+
// This entails that `dst..dst + len` is always in bounds of the allocation.
148202
// - `chunk` and `dst` are trivially properly aligned (`align_of::<u8>() == 1`).
149-
// - The allocation `ptr` points to is new so derived pointers cannot overlap with `chunk`.
203+
// - By (2) derived pointers of `dst` cannot overlap with `chunk`.
150204
unsafe {
151205
ptr::copy_nonoverlapping(chunk.as_ptr(), dst, len);
152206
}
153207
// SAFETY: Same as (1).
154208
dst = unsafe { dst.add(len) };
155209
}
210+
}
156211

157-
// Convert allocation to a `Vec<u8>`.
158-
// SAFETY:
159-
// - `ptr` was allocated using global allocator.
160-
// - `u8` and `ptr`'s allocation both have alignment of 1.
161-
// - `ptr`'s allocation is `total_len <= isize::MAX`.
162-
// - `total_len <= total_len` holds.
163-
// - `total_len` values were initialized at type `u8`
164-
// as we know `total_len == chunks.map(|c| c.len()).sum()`.
165-
unsafe { Vec::from_raw_parts(ptr, total_len, total_len) }
212+
/// Convert a `[MaybeUninit<T>]` into a `[T]` by asserting all elements are initialized.
213+
///
214+
/// Identitcal copy of the source of `MaybeUninit::slice_assume_init_ref`, but that's not stabilized.
215+
/// https://doc.rust-lang.org/std/mem/union.MaybeUninit.html#method.slice_assume_init_ref
216+
///
217+
/// # Safety
218+
///
219+
/// All elements of `slice` must be initialized.
220+
pub const unsafe fn slice_assume_init_ref<T>(slice: &[MaybeUninit<T>]) -> &[T] {
221+
// SAFETY: casting `slice` to a `*const [T]` is safe since the caller guarantees that
222+
// `slice` is initialized, and `MaybeUninit` is guaranteed to have the same layout as `T`.
223+
// The pointer obtained is valid since it refers to memory owned by `slice` which is a
224+
// reference and thus guaranteed to be valid for reads.
225+
unsafe { &*(slice as *const [MaybeUninit<T>] as *const [T]) }
166226
}
167227

168228
/// Continuation for serializing an array.

crates/table/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ proptest = { workspace = true, optional = true }
4848
proptest-derive = { workspace = true, optional = true }
4949

5050
[dev-dependencies]
51+
spacetimedb-sats = { workspace = true, features = ["proptest"] }
5152
criterion.workspace = true
5253
proptest.workspace = true
5354
proptest-derive.workspace = true

crates/table/benches/page.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -770,7 +770,15 @@ fn hash_in_page(c: &mut Criterion) {
770770
group.bench_function(name, |b| {
771771
let mut hasher = RowHash::hasher_builder().build_hasher();
772772
b.iter(|| {
773-
unsafe { hash_row_in_page(&mut hasher, black_box(&page), black_box(offset), black_box(&ty)) };
773+
unsafe {
774+
hash_row_in_page(
775+
&mut hasher,
776+
black_box(&page),
777+
black_box(&NullBlobStore),
778+
black_box(offset),
779+
black_box(&ty),
780+
)
781+
};
774782
black_box(&mut hasher);
775783
});
776784
});

crates/table/benches/page_manager.rs

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -524,16 +524,19 @@ fn use_type_throughput<T>(group: &mut BenchmarkGroup<'_, impl Measurement>) {
524524

525525
fn table_insert_one_row(c: &mut Criterion) {
526526
fn bench_insert_row<R: Row>(group: Group<'_, '_>, val: R, name: &str) {
527-
let mut table = make_table_for_row_type::<R>(name);
527+
let table = make_table_for_row_type::<R>(name);
528528
let val = black_box(val.to_product());
529529

530530
// Insert before benching to alloc and fault in a page.
531-
let ptr = table.insert(&mut NullBlobStore, &val).unwrap().1;
532-
let pre = |_, table: &mut Table| {
533-
table.delete(&mut NullBlobStore, ptr).unwrap();
531+
let mut ctx = (table, NullBlobStore);
532+
let ptr = ctx.0.insert(&mut ctx.1, &val).unwrap().1.pointer();
533+
let pre = |_, (table, bs): &mut (Table, NullBlobStore)| {
534+
table.delete(bs, ptr).unwrap();
534535
};
535536
group.bench_function(name, |b| {
536-
iter_time_with(b, &mut table, pre, |_, _, table| table.insert(&mut NullBlobStore, &val));
537+
iter_time_with(b, &mut ctx, pre, |_, _, (table, bs)| {
538+
table.insert(bs, &val).map(|r| r.1.pointer())
539+
});
537540
});
538541
}
539542

@@ -571,16 +574,15 @@ fn table_insert_one_row(c: &mut Criterion) {
571574

572575
fn table_delete_one_row(c: &mut Criterion) {
573576
fn bench_delete_row<R: Row>(group: Group<'_, '_>, val: R, name: &str) {
574-
let mut table = make_table_for_row_type::<R>(name);
577+
let table = make_table_for_row_type::<R>(name);
575578
let val = val.to_product();
576579

577580
// Insert before benching to alloc and fault in a page.
578-
let insert = |_, table: &mut Table| table.insert(&mut NullBlobStore, &val).unwrap().1;
581+
let mut ctx = (table, NullBlobStore);
582+
let insert = |_: u64, (table, bs): &mut (Table, NullBlobStore)| table.insert(bs, &val).unwrap().1.pointer();
579583

580584
group.bench_function(name, |b| {
581-
iter_time_with(b, &mut table, insert, |ptr, _, table| {
582-
table.delete(&mut NullBlobStore, ptr)
583-
});
585+
iter_time_with(b, &mut ctx, insert, |row, _, (table, bs)| table.delete(bs, row));
584586
});
585587
}
586588

@@ -621,16 +623,10 @@ fn table_extract_one_row(c: &mut Criterion) {
621623
let mut table = make_table_for_row_type::<R>(name);
622624
let val = val.to_product();
623625

624-
let ptr = table.insert(&mut NullBlobStore, &val).unwrap().1;
626+
let mut blob_store = NullBlobStore;
627+
let row = black_box(table.insert(&mut blob_store, &val).unwrap().1);
625628
group.bench_function(name, |b| {
626-
b.iter_with_large_drop(|| {
627-
black_box(
628-
black_box(&table)
629-
.get_row_ref(&NullBlobStore, black_box(ptr))
630-
.unwrap()
631-
.to_product_value(),
632-
)
633-
});
629+
b.iter_with_large_drop(|| black_box(row.to_product_value()));
634630
});
635631
}
636632

@@ -760,7 +756,7 @@ fn insert_num_same<R: IndexedRow>(
760756
if let Some(slot) = row.elements.get_mut(1) {
761757
*slot = n.into();
762758
}
763-
tbl.insert(&mut NullBlobStore, &row).map(|(_, ptr)| ptr).ok()
759+
tbl.insert(&mut NullBlobStore, &row).map(|(_, row)| row.pointer()).ok()
764760
})
765761
.last()
766762
.flatten()
@@ -815,18 +811,21 @@ fn index_insert(c: &mut Criterion) {
815811
same_ratio: f64,
816812
) {
817813
let make_row_move = &mut make_row;
818-
let (mut tbl, num_same, _) = make_table_with_same_ratio::<R>(make_row_move, num_rows, same_ratio);
814+
let (tbl, num_same, _) = make_table_with_same_ratio::<R>(make_row_move, num_rows, same_ratio);
815+
let mut ctx = (tbl, NullBlobStore);
819816

820817
group.bench_with_input(
821818
bench_id_for_index(name, num_rows, same_ratio, num_same),
822819
&num_rows,
823820
|b, &num_rows| {
824-
let pre = |_, tbl: &mut Table| {
821+
let pre = |_, (tbl, _): &mut (Table, NullBlobStore)| {
825822
clear_all_same::<R>(tbl, num_rows);
826823
insert_num_same(tbl, || make_row(num_rows), num_same - 1);
827824
make_row(num_rows).to_product()
828825
};
829-
iter_time_with(b, &mut tbl, pre, |row, _, tbl| tbl.insert(&mut NullBlobStore, &row));
826+
iter_time_with(b, &mut ctx, pre, |row, _, (tbl, bs)| {
827+
tbl.insert(bs, &row).map(|r| r.1.pointer())
828+
});
830829
},
831830
);
832831
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# Seeds for failure cases proptest has generated in the past. It is
2+
# automatically read and these particular cases re-run before any
3+
# novel cases are generated.
4+
#
5+
# It is recommended to check this file in to source control so that
6+
# everyone who runs the test benefits from these saved cases.
7+
cc 3e55b94365a0ae7698bb9e89259f3f5b84227b1c5ba2f0737be0360f55256c26 # shrinks to (ty, val) = (ProductType { elements: [ProductTypeElement { name: None, algebraic_type: Sum(SumType { variants: [SumTypeVariant { name: None, algebraic_type: Builtin(Bool) }] }) }] }, ProductValue { elements: [Sum(SumValue { tag: 0, value: Bool(false) })] })
8+
cc aedcfc0fa45005cb11fa8b47f668a8b68c99adadfb50fdc6219840aa8ffd83f6 # shrinks to (ty, val) = (ProductType { elements: [ProductTypeElement { name: None, algebraic_type: Sum(SumType { variants: [SumTypeVariant { name: None, algebraic_type: Builtin(I32) }, SumTypeVariant { name: None, algebraic_type: Builtin(F64) }, SumTypeVariant { name: None, algebraic_type: Builtin(String) }, SumTypeVariant { name: None, algebraic_type: Builtin(U16) }, SumTypeVariant { name: None, algebraic_type: Builtin(U16) }, SumTypeVariant { name: None, algebraic_type: Builtin(String) }, SumTypeVariant { name: None, algebraic_type: Builtin(Bool) }, SumTypeVariant { name: None, algebraic_type: Builtin(I16) }, SumTypeVariant { name: None, algebraic_type: Builtin(I16) }, SumTypeVariant { name: None, algebraic_type: Builtin(I64) }, SumTypeVariant { name: None, algebraic_type: Builtin(I128) }, SumTypeVariant { name: None, algebraic_type: Builtin(U64) }] }) }] }, ProductValue { elements: [Sum(SumValue { tag: 2, value: String("יּ/Ⱥ🂠છrÔ") })] })

0 commit comments

Comments
 (0)