diff --git a/Cargo.lock b/Cargo.lock index 06d304fbc9f..2aed621ef05 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3102,6 +3102,17 @@ dependencies = [ "unarray", ] +[[package]] +name = "proptest-derive" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cf16337405ca084e9c78985114633b6827711d22b9e6ef6c6c0d665eb3f0b6e" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "prost" version = "0.10.4" @@ -4277,6 +4288,8 @@ dependencies = [ "paste", "pin-project-lite", "prometheus", + "proptest", + "proptest-derive", "prost", "prost-build", "rand", diff --git a/Cargo.toml b/Cargo.toml index ac71e849791..7fe900391f9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -122,6 +122,7 @@ postgres-types = "0.2.5" proc-macro2 = "1.0" prometheus = "0.13.0" proptest = "1.2.0" +proptest-derive = "0.4.0" prost = "0.10" prost-build = { version = "0.10" } quick-junit = { version = "0.3.2" } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index e37a38a8e52..470697b309e 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -101,6 +101,8 @@ default = ["odb_sled"] [dev-dependencies] rusqlite.workspace = true criterion.workspace = true +proptest.workspace = true +proptest-derive.workspace = true rand.workspace = true [build-dependencies] diff --git a/crates/core/src/db/commit_log.rs b/crates/core/src/db/commit_log.rs index a40cc941bd7..cac41e43616 100644 --- a/crates/core/src/db/commit_log.rs +++ b/crates/core/src/db/commit_log.rs @@ -167,7 +167,7 @@ impl CommitLog { } } - let mut bytes = Vec::new(); + let mut bytes = Vec::with_capacity(unwritten_commit.encoded_len()); unwritten_commit.encode(&mut bytes); unwritten_commit.parent_commit_hash = Some(hash_bytes(&bytes)); @@ -279,11 +279,8 @@ impl Iterator for IterSegment { fn next(&mut self) -> Option { let next = self.inner.next()?; - Some(next.map(|bytes| { - // It seems very improbable that `decode` is infallible... - let (commit, _) = Commit::decode(bytes); - commit - })) + let io = |e| io::Error::new(io::ErrorKind::InvalidData, e); + Some(next.and_then(|bytes| Commit::decode(&mut bytes.as_slice()).map_err(io))) } } diff --git a/crates/core/src/db/messages/commit.rs b/crates/core/src/db/messages/commit.rs index 097b787c378..77b38a1d7bd 100644 --- a/crates/core/src/db/messages/commit.rs +++ b/crates/core/src/db/messages/commit.rs @@ -1,81 +1,119 @@ +use anyhow::{bail, Context as _}; +use spacetimedb_sats::buffer::{BufReader, BufWriter}; +use std::{fmt, sync::Arc}; + use super::transaction::Transaction; use crate::hash::Hash; -use std::sync::Arc; -// aka "Block" from blockchain, aka RecordBatch, aka TxBatch -#[derive(Debug)] +#[cfg(test)] +use proptest::prelude::*; +#[cfg(test)] +use proptest_derive::Arbitrary; + +/// A commit is one record in the write-ahead log. +/// +/// Encoding: +/// +/// ```text +/// [0u8 | 1u8][...] +/// ``` +#[derive(Debug, Default, PartialEq)] +#[cfg_attr(test, derive(Arbitrary))] pub struct Commit { + /// The [`Hash`] over the encoded bytes of the previous commit, or `None` if + /// it is the very first commit. + #[cfg_attr(test, proptest(strategy = "arbitrary::parent_commit_hash()"))] pub parent_commit_hash: Option, + /// Counter of all commits in a log. pub commit_offset: u64, + /// Counter of all transactions in a log. + /// + /// That is, a per-log value which is incremented by `transactions.len()` + /// when the [`Commit`] is constructed. pub min_tx_offset: u64, + /// The [`Transaction`]s in this commit, usually only one. + #[cfg_attr(test, proptest(strategy = "arbitrary::transactions()"))] pub transactions: Vec>, } -// TODO: Maybe a transaction buffer hash? -// commit: [...]* -impl Commit { - pub fn decode(bytes: impl AsRef<[u8]>) -> (Self, usize) { - let bytes = &mut bytes.as_ref(); - if bytes.is_empty() { - return ( - Commit { - parent_commit_hash: None, - commit_offset: 0, - min_tx_offset: 0, - transactions: Vec::new(), - }, - 0, - ); +#[cfg(test)] +mod arbitrary { + use super::*; + + // [`Hash`] is defined in `lib`, so we can't have an [`Arbitrary`] impl for + // it just yet due to orphan rules. + pub fn parent_commit_hash() -> impl Strategy> { + any::>().prop_map(|maybe_hash| maybe_hash.map(|data| Hash { data })) + } + + // Custom strategy to apply an upper bound on the number of [`Transaction`]s + // generated. + // + // We only ever commit a single transaction in practice. + pub fn transactions() -> impl Strategy>> { + prop::collection::vec(any::>(), 1..8) + } +} + +/// Error context for [`Commit::decode`] +enum Context { + Parent, + Hash, + CommitOffset, + MinTxOffset, + Transaction(usize), +} + +impl fmt::Display for Context { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("Failed to decode `Commit`: ")?; + match self { + Self::Parent => f.write_str("parent commit hash tag"), + Self::Hash => f.write_str("parent commit hash"), + Self::CommitOffset => f.write_str("commit offset"), + Self::MinTxOffset => f.write_str("min transaction offset"), + Self::Transaction(n) => f.write_str(&format!("transaction {n}")), } + } +} - let mut read_count = 0; +impl Commit { + pub fn decode<'a>(reader: &mut impl BufReader<'a>) -> anyhow::Result { + if reader.remaining() == 0 { + return Ok(Self::default()); + } - let parent_commit_hash = if bytes[read_count] != 0 { - read_count += 1; - let parent_commit_hash = Hash::from_slice(&bytes[read_count..read_count + 32]); - read_count += 32; - Some(parent_commit_hash) - } else { - read_count += 1; - None + let parent_commit_hash = match reader.get_u8().context(Context::Parent)? { + 0 => None, + 1 => reader + .get_array() + .map(|data| Hash { data }) + .map(Some) + .context(Context::Hash)?, + x => bail!("Invalid tag for `Option`: {x}"), }; - - let mut dst = [0u8; 8]; - dst.copy_from_slice(&bytes[read_count..read_count + 8]); - let commit_offset = u64::from_le_bytes(dst); - read_count += 8; - - let mut dst = [0u8; 8]; - dst.copy_from_slice(&bytes[read_count..read_count + 8]); - let min_tx_offset = u64::from_le_bytes(dst); - read_count += 8; - - let mut transactions: Vec> = Vec::new(); - while read_count < bytes.len() { - let (tx, read) = Transaction::decode(&bytes[read_count..]); - read_count += read; - transactions.push(Arc::new(tx)); + let commit_offset = reader.get_u64().context(Context::CommitOffset)?; + let min_tx_offset = reader.get_u64().context(Context::MinTxOffset)?; + let mut transactions = Vec::new(); + while reader.remaining() > 0 { + let tx = Transaction::decode(reader) + .map(Arc::new) + .with_context(|| Context::Transaction(transactions.len() + 1))?; + transactions.push(tx); } - ( - Commit { - parent_commit_hash, - commit_offset, - min_tx_offset, - transactions, - }, - read_count, - ) + Ok(Self { + parent_commit_hash, + commit_offset, + min_tx_offset, + transactions, + }) } pub fn encoded_len(&self) -> usize { - let mut count = 0; - - if self.parent_commit_hash.is_none() { - count += 1; - } else { - count += 1; - count += self.parent_commit_hash.unwrap().data.len(); + let mut count = 1; // tag for option + if let Some(hash) = self.parent_commit_hash { + count += hash.data.len(); } // 8 for commit_offset @@ -91,21 +129,48 @@ impl Commit { count } - pub fn encode(&self, bytes: &mut Vec) { - bytes.reserve(self.encoded_len()); - - if self.parent_commit_hash.is_none() { - bytes.push(0); - } else { - bytes.push(1); - bytes.extend(self.parent_commit_hash.unwrap().data); + pub fn encode(&self, writer: &mut impl BufWriter) { + match self.parent_commit_hash { + Some(hash) => { + writer.put_u8(1); + writer.put_slice(&hash.data); + } + None => writer.put_u8(0), } + writer.put_u64(self.commit_offset); + writer.put_u64(self.min_tx_offset); + for tx in &self.transactions { + tx.encode(writer); + } + } +} - bytes.extend(self.commit_offset.to_le_bytes()); - bytes.extend(self.min_tx_offset.to_le_bytes()); +#[cfg(test)] +mod tests { + use super::*; + + proptest! { + // Generating arbitrary commits is quite slow, so limit to just a few + // cases. + // + // Note that this config applies to all `#[test]`s within the enclosing + // `proptest!`. + #![proptest_config(ProptestConfig::with_cases(64))] + + + #[test] + fn prop_commit_encoding_roundtrip(commit in any::()) { + let mut buf = Vec::new(); + commit.encode(&mut buf); + let decoded = Commit::decode(&mut buf.as_slice()).unwrap(); + prop_assert_eq!(commit, decoded) + } - for tx in &self.transactions { - tx.encode(bytes); + #[test] + fn prop_encoded_len_is_encoded_len(commit in any::()) { + let mut buf = Vec::new(); + commit.encode(&mut buf); + prop_assert_eq!(buf.len(), commit.encoded_len()) } } } diff --git a/crates/core/src/db/messages/transaction.rs b/crates/core/src/db/messages/transaction.rs index b13b364c544..7703e88e027 100644 --- a/crates/core/src/db/messages/transaction.rs +++ b/crates/core/src/db/messages/transaction.rs @@ -1,38 +1,69 @@ +use anyhow::Context as _; +use spacetimedb_sats::buffer::{BufReader, BufWriter}; +use std::fmt; + use super::write::Write; -// aka Record -// Must be atomically, durably written to disk -#[derive(Debug, Clone)] +#[cfg(test)] +use proptest::prelude::*; +#[cfg(test)] +use proptest_derive::Arbitrary; + +/// A transaction, consisting of one or more [`Write`]s. +/// +/// Encoding: +/// +/// ```text +/// [] +/// ``` +#[derive(Debug, Clone, Default, PartialEq)] +#[cfg_attr(test, derive(Arbitrary))] pub struct Transaction { + #[cfg_attr(test, proptest(strategy = "arbitrary::writes()"))] pub writes: Vec, } -// tx: [...(dedupped and sorted_numerically)]* -impl Transaction { - pub fn decode(bytes: impl AsRef<[u8]>) -> (Self, usize) { - let bytes = &mut bytes.as_ref(); - if bytes.is_empty() { - return (Transaction { writes: Vec::new() }, 0); - } +#[cfg(test)] +mod arbitrary { + use super::*; + + // Limit to 64 for performance reasons. + pub fn writes() -> impl Strategy> { + prop::collection::vec(any::(), 1..64) + } +} - let mut bytes_read = 0; +/// Error context for [`Transaction::decode`]. +enum Context { + Len, + Write(u32), +} - let mut dst = [0u8; 4]; - dst.copy_from_slice(&bytes[bytes_read..bytes_read + 4]); - let writes_count = u32::from_le_bytes(dst); - bytes_read += 4; +impl fmt::Display for Context { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("Failed to decode `Transaction`: ")?; + match self { + Self::Len => f.write_str("number of writes"), + Self::Write(n) => f.write_str(&format!("write {n}")), + } + } +} - let mut writes: Vec = Vec::with_capacity(writes_count as usize); +// tx: [...(dedupped and sorted_numerically)]* +impl Transaction { + pub fn decode<'a>(reader: &mut impl BufReader<'a>) -> anyhow::Result { + if reader.remaining() == 0 { + return Ok(Self::default()); + } - let mut count = 0; - while bytes_read < bytes.len() && count < writes_count { - let (write, read) = Write::decode(&bytes[bytes_read..]); - bytes_read += read; + let n = reader.get_u32().context(Context::Len)?; + let mut writes = Vec::with_capacity(n as usize); + for i in 0..n { + let write = Write::decode(reader).with_context(|| Context::Write(i))?; writes.push(write); - count += 1; } - (Transaction { writes }, bytes_read) + Ok(Self { writes }) } pub fn encoded_len(&self) -> usize { @@ -43,11 +74,10 @@ impl Transaction { count } - pub fn encode(&self, bytes: &mut Vec) { - bytes.extend((self.writes.len() as u32).to_le_bytes()); - + pub fn encode(&self, writer: &mut impl BufWriter) { + writer.put_u32(self.writes.len() as u32); for write in &self.writes { - write.encode(bytes); + write.encode(writer); } } } diff --git a/crates/core/src/db/messages/write.rs b/crates/core/src/db/messages/write.rs index 35c89706a09..b02e98410e1 100644 --- a/crates/core/src/db/messages/write.rs +++ b/crates/core/src/db/messages/write.rs @@ -1,13 +1,57 @@ +use std::fmt; + +use anyhow::Context as _; pub use spacetimedb_lib::DataKey; +use spacetimedb_sats::buffer::{BufReader, BufWriter, DecodeError}; + +#[cfg(test)] +use proptest::prelude::*; +#[cfg(test)] +use proptest_derive::Arbitrary; -#[derive(Debug, Copy, Clone)] +/// A single write operation within a [`super::transaction::Transaction`]. +/// +/// Encoding: +/// +/// ```text +/// +/// ``` +#[derive(Debug, Copy, Clone, PartialEq)] +#[cfg_attr(test, derive(Arbitrary))] pub struct Write { pub operation: Operation, - pub set_id: u32, + pub set_id: u32, // aka table id + #[cfg_attr(test, proptest(strategy = "arbitrary::datakey()"))] pub data_key: DataKey, } -#[derive(Debug, Copy, Clone)] +#[cfg(test)] +mod arbitrary { + use super::*; + + // [`DataKey`] is defined in `lib`, so we can't have an [`Arbitrary`] impl + // for it just yet due to orphan rules. + pub fn datakey() -> impl Strategy { + // Create [`DataKey::Inline`] and [`DataKey::Hash`] with the same + // probability. [`DataKey::from_data`] will take care of choosing the + // variant based in the input length. + prop_oneof![ + prop::collection::vec(any::(), 0..31).prop_map(DataKey::from_data), + prop::collection::vec(any::(), 31..255).prop_map(DataKey::from_data) + ] + } +} + +/// The operation of a [`Write`], either insert or delete. +/// +/// Encoded as a single byte with bits: +/// +/// ```text +/// 0 = insert / delete +/// 1-7 = unused +/// ``` +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +#[cfg_attr(test, derive(Arbitrary))] #[repr(u8)] pub enum Operation { Delete = 0, @@ -28,60 +72,66 @@ impl Operation { _ => Self::Insert, } } + + pub fn decode<'a>(reader: &mut impl BufReader<'a>) -> Result { + let flags = reader.get_u8()?; + let op = (flags & 0b1000_0000) >> 7; + + Ok(Self::from_u8(op)) + } + + pub fn encoded_len(&self) -> usize { + 1 + } + + pub fn encode(&self, writer: &mut impl BufWriter) { + let mut flags = 0u8; + flags = if self.to_u8() != 0 { flags | 0b1000_0000 } else { flags }; + writer.put_u8(flags); + } +} + +/// Error context for [`Write::decode`]. +enum Context { + Op, + SetId, + DataKey, +} + +impl fmt::Display for Context { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("Failed to decode `Write`: ")?; + match self { + Self::Op => f.write_str("operation flags"), + Self::SetId => f.write_str("set id"), + Self::DataKey => f.write_str("data key"), + } + } } impl Write { - // write_flags: - // b0 = insert/delete, - // b1 = unused, - // b2 = unused, - // b3,b4,b5,b6,b7 unused - // write: - pub fn decode(bytes: impl AsRef<[u8]>) -> (Self, usize) { - let bytes = &mut bytes.as_ref(); - let mut read_count = 0; - - let flags = bytes[read_count]; - read_count += 1; - - let op = (flags & 0b1000_0000) >> 7; + pub fn decode<'a>(reader: &mut impl BufReader<'a>) -> anyhow::Result { + let operation = Operation::decode(reader).context(Context::Op)?; + let set_id = reader.get_u32().context(Context::SetId)?; + let data_key = DataKey::decode(reader).context(Context::DataKey)?; - let mut dst = [0u8; 4]; - dst.copy_from_slice(&bytes[read_count..read_count + 4]); - let set_id = u32::from_le_bytes(dst); - read_count += 4; - - let mut reader = &bytes[read_count..]; - let orig_len = reader.len(); - let value = DataKey::decode(&mut reader).unwrap(); - read_count += orig_len - reader.len(); - - ( - Write { - operation: Operation::from_u8(op), - set_id, - data_key: value, - }, - read_count, - ) + Ok(Self { + operation, + set_id, + data_key, + }) } pub fn encoded_len(&self) -> usize { - // 1 for flags, 4 for set_id - let mut count = 1 + 4; + let mut count = self.operation.encoded_len(); + count += 4; // set_id count += self.data_key.encoded_len(); count } - pub fn encode(&self, bytes: &mut Vec) { - let mut flags: u8 = 0; - flags = if self.operation.to_u8() != 0 { - flags | 0b1000_0000 - } else { - flags - }; - bytes.push(flags); - bytes.extend(self.set_id.to_le_bytes()); - self.data_key.encode(bytes); + pub fn encode(&self, writer: &mut impl BufWriter) { + self.operation.encode(writer); + writer.put_u32(self.set_id); + self.data_key.encode(writer); } }