Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ postgres-types = "0.2.5"
proc-macro2 = "1.0"
prometheus = "0.13.0"
proptest = "1.2.0"
proptest-derive = "0.4.0"
prost = "0.10"
prost-build = { version = "0.10" }
quick-junit = { version = "0.3.2" }
Expand Down
2 changes: 2 additions & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ default = ["odb_sled"]
[dev-dependencies]
rusqlite.workspace = true
criterion.workspace = true
proptest.workspace = true
proptest-derive.workspace = true
rand.workspace = true

[build-dependencies]
Expand Down
7 changes: 2 additions & 5 deletions crates/core/src/db/commit_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,11 +279,8 @@ impl Iterator for IterSegment {

fn next(&mut self) -> Option<Self::Item> {
let next = self.inner.next()?;
Some(next.map(|bytes| {
// It seems very improbable that `decode` is infallible...
let (commit, _) = Commit::decode(bytes);
commit
}))
let io = |e| io::Error::new(io::ErrorKind::InvalidData, e);
Some(next.and_then(|bytes| Commit::decode(&mut bytes.as_slice()).map_err(io)))
}
}

Expand Down
195 changes: 125 additions & 70 deletions crates/core/src/db/messages/commit.rs
Original file line number Diff line number Diff line change
@@ -1,81 +1,121 @@
use anyhow::{bail, Context as _};
use spacetimedb_sats::buffer::{BufReader, BufWriter};
use std::{fmt, sync::Arc};

use super::transaction::Transaction;
use crate::hash::Hash;
use std::sync::Arc;

// aka "Block" from blockchain, aka RecordBatch, aka TxBatch
#[derive(Debug)]
#[cfg(test)]
use proptest::prelude::*;
#[cfg(test)]
use proptest_derive::Arbitrary;

/// A commit is one record in the write-ahead log.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A drive by thought: similarly to how we have a bunch of code that says set_id but commented "this is the table id", I would love to settle on saying "write ahead log" everywhere, including data type names etc. Or some other name. But let's pick one!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'd love to settle on WAL / write ahead log, maybe even collapsing CommitLog and MessageLog into a single type. Possibly not for this PR, though?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'd love to have us try to take things that way. I agree on the naming, I think the commit log and message log names are implementation details and that "write ahead log" is the name that reflects the semantics the datastore needs to care about. But yeah not for this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just adding my two cents here. I agree on coalescing on the name. I avoided WAL because historically that's referred to a log that is eventually compressed/deleted, but I think on balance WAL is probably the best name. We can include a comment about the fact that the WAL should never be deleted.

We should also note that it will include information which is not normally in a WAL, including reducer call event info and that it forms a Merkle DAG between commits.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The term MessageLog was borrowed from Kafka.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted to include all of these in the upcoming formalization / design changes doc.

///
/// Encoding:
///
/// ```text
/// [0u8 | 1u8<hash(32)>]<commit_offset(8)><min_tx_offset<8>[<transaction>...]
/// ```
#[derive(Debug, Default, PartialEq)]
#[cfg_attr(test, derive(Arbitrary))]
pub struct Commit {
/// The [`Hash`] over the encoded bytes of the previous commit, or `None` if
/// it is the very first commit.
#[cfg_attr(test, proptest(strategy = "arbitrary::parent_commit_hash()"))]
pub parent_commit_hash: Option<Hash>,
/// Counter of all commits in a log.
pub commit_offset: u64,
/// Counter of all transactions in a log.
///
/// That is, a per-log value which is incremented by `transactions.len()`
/// when the [`Commit`] is constructed.
pub min_tx_offset: u64,
/// The [`Transaction`]s in this commit, usually only one.
#[cfg_attr(test, proptest(strategy = "arbitrary::transactions()"))]
pub transactions: Vec<Arc<Transaction>>,
}

// TODO: Maybe a transaction buffer hash?
// commit: <parent_commit_hash(32)><commit_offset(8)><min_tx_offset(8)>[<transaction>...]*
impl Commit {
pub fn decode(bytes: impl AsRef<[u8]>) -> (Self, usize) {
let bytes = &mut bytes.as_ref();
if bytes.is_empty() {
return (
Commit {
parent_commit_hash: None,
commit_offset: 0,
min_tx_offset: 0,
transactions: Vec::new(),
},
0,
);
#[cfg(test)]
mod arbitrary {
use super::*;

// [`Hash`] is defined in `lib`, so we can't have an [`Arbitrary`] impl for
// it just yet due to orphan rules.
pub fn parent_commit_hash() -> impl Strategy<Value = Option<Hash>> {
any::<Option<[u8; 32]>>().prop_map(|maybe_hash| maybe_hash.map(|data| Hash { data }))
}

// Custom strategy to apply an upper bound on the number of [`Transaction`]s
// generated.
//
// We only ever commit a single transaction in practice.
pub fn transactions() -> impl Strategy<Value = Vec<Arc<Transaction>>> {
prop::collection::vec(any::<Arc<Transaction>>(), 1..8)
}
}

/// Error context for [`Commit::decode`]
enum Context {
Parent,
Hash,
CommitOffset,
MinTxOffset,
Transaction(usize),
}

impl fmt::Display for Context {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Failed to decode `Commit`: ")?;
match self {
Self::Parent => f.write_str("parent commit hash tag"),
Self::Hash => f.write_str("parent commit hash"),
Self::CommitOffset => f.write_str("commit offset"),
Self::MinTxOffset => f.write_str("min transaction offset"),
Self::Transaction(n) => f.write_str(&format!("transaction {n}")),
}
}
}

let mut read_count = 0;
impl Commit {
pub fn decode<'a>(reader: &mut impl BufReader<'a>) -> anyhow::Result<Self> {
if reader.remaining() == 0 {
return Ok(Self::default());
}

let parent_commit_hash = if bytes[read_count] != 0 {
read_count += 1;
let parent_commit_hash = Hash::from_slice(&bytes[read_count..read_count + 32]);
read_count += 32;
Some(parent_commit_hash)
} else {
read_count += 1;
None
let parent_commit_hash = match reader.get_u8().context(Context::Parent)? {
0 => None,
1 => reader
.get_array()
.map(|data| Hash { data })
.map(Some)
.context(Context::Hash)?,
x => bail!("Invalid tag for `Option<Hash>`: {x}"),
};

let mut dst = [0u8; 8];
dst.copy_from_slice(&bytes[read_count..read_count + 8]);
let commit_offset = u64::from_le_bytes(dst);
read_count += 8;

let mut dst = [0u8; 8];
dst.copy_from_slice(&bytes[read_count..read_count + 8]);
let min_tx_offset = u64::from_le_bytes(dst);
read_count += 8;

let mut transactions: Vec<Arc<Transaction>> = Vec::new();
while read_count < bytes.len() {
let (tx, read) = Transaction::decode(&bytes[read_count..]);
read_count += read;
transactions.push(Arc::new(tx));
let commit_offset = reader.get_u64().context(Context::CommitOffset)?;
let min_tx_offset = reader.get_u64().context(Context::MinTxOffset)?;
let mut transactions = Vec::new();
while reader.remaining() > 0 {
let tx = Transaction::decode(reader)
.map(Arc::new)
.with_context(|| Context::Transaction(transactions.len() + 1))?;
transactions.push(tx);
}

(
Commit {
parent_commit_hash,
commit_offset,
min_tx_offset,
transactions,
},
read_count,
)
Ok(Self {
parent_commit_hash,
commit_offset,
min_tx_offset,
transactions,
})
}

pub fn encoded_len(&self) -> usize {
let mut count = 0;

if self.parent_commit_hash.is_none() {
count += 1;
} else {
count += 1;
count += self.parent_commit_hash.unwrap().data.len();
count += 1; // tag for option
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: collapse these

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if let Some(hash) = self.parent_commit_hash {
count += hash.data.len();
}

// 8 for commit_offset
Expand All @@ -91,21 +131,36 @@ impl Commit {
count
}

pub fn encode(&self, bytes: &mut Vec<u8>) {
bytes.reserve(self.encoded_len());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little unfortunate that we're losing this optimization, but maybe we can improve things by not using a Vec at all but a buffer pool or something in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I meant to make sure the caller takes care of this, will add.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


if self.parent_commit_hash.is_none() {
bytes.push(0);
} else {
bytes.push(1);
bytes.extend(self.parent_commit_hash.unwrap().data);
pub fn encode(&self, writer: &mut impl BufWriter) {
match self.parent_commit_hash {
Some(hash) => {
writer.put_u8(1);
writer.put_slice(&hash.data);
}
None => writer.put_u8(0),
}

bytes.extend(self.commit_offset.to_le_bytes());
bytes.extend(self.min_tx_offset.to_le_bytes());

writer.put_u64(self.commit_offset);
writer.put_u64(self.min_tx_offset);
for tx in &self.transactions {
tx.encode(bytes);
tx.encode(writer);
}
}
}

#[cfg(test)]
mod tests {
use super::*;

proptest! {
// Generating arbitrary commits is quite slow, so limit this test to
// just a few cases.
#![proptest_config(ProptestConfig::with_cases(64))]
#[test]
fn prop_commit_encoding_roundtrip(commit in any::<Commit>()) {
let mut buf = Vec::new();
commit.encode(&mut buf);
let decoded = Commit::decode(&mut buf.as_slice()).unwrap();
prop_assert_eq!(commit, decoded)
}
}
}
82 changes: 56 additions & 26 deletions crates/core/src/db/messages/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,69 @@
use anyhow::Context as _;
use spacetimedb_sats::buffer::{BufReader, BufWriter};
use std::fmt;

use super::write::Write;

// aka Record
// Must be atomically, durably written to disk
#[derive(Debug, Clone)]
#[cfg(test)]
use proptest::prelude::*;
#[cfg(test)]
use proptest_derive::Arbitrary;

/// A transaction, consisting of one or more [`Write`]s.
///
/// Encoding:
///
/// ```text
/// <n(4)>[<write_0(6-38)...<write_n(6-38)>]
/// ```
#[derive(Debug, Clone, Default, PartialEq)]
#[cfg_attr(test, derive(Arbitrary))]
pub struct Transaction {
#[cfg_attr(test, proptest(strategy = "arbitrary::writes()"))]
pub writes: Vec<Write>,
}

// tx: [<write>...(dedupped and sorted_numerically)]*
impl Transaction {
pub fn decode(bytes: impl AsRef<[u8]>) -> (Self, usize) {
let bytes = &mut bytes.as_ref();
if bytes.is_empty() {
return (Transaction { writes: Vec::new() }, 0);
}
#[cfg(test)]
mod arbitrary {
use super::*;

// Limit to 64 for performance reasons.
pub fn writes() -> impl Strategy<Value = Vec<Write>> {
prop::collection::vec(any::<Write>(), 1..64)
}
}

let mut bytes_read = 0;
/// Error context for [`Transaction::decode`].
enum Context {
Len,
Write(u32),
}

let mut dst = [0u8; 4];
dst.copy_from_slice(&bytes[bytes_read..bytes_read + 4]);
let writes_count = u32::from_le_bytes(dst);
bytes_read += 4;
impl fmt::Display for Context {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Failed to decode `Transaction`: ")?;
match self {
Self::Len => f.write_str("number of writes"),
Self::Write(n) => f.write_str(&format!("write {n}")),
}
}
}

let mut writes: Vec<Write> = Vec::with_capacity(writes_count as usize);
// tx: [<write>...(dedupped and sorted_numerically)]*
impl Transaction {
pub fn decode<'a>(reader: &mut impl BufReader<'a>) -> anyhow::Result<Self> {
if reader.remaining() == 0 {
return Ok(Self::default());
}

let mut count = 0;
while bytes_read < bytes.len() && count < writes_count {
let (write, read) = Write::decode(&bytes[bytes_read..]);
bytes_read += read;
let n = reader.get_u32().context(Context::Len)?;
let mut writes = Vec::with_capacity(n as usize);
for i in 0..n {
let write = Write::decode(reader).with_context(|| Context::Write(i))?;
writes.push(write);
count += 1;
}

(Transaction { writes }, bytes_read)
Ok(Self { writes })
}

pub fn encoded_len(&self) -> usize {
Expand All @@ -43,11 +74,10 @@ impl Transaction {
count
}

pub fn encode(&self, bytes: &mut Vec<u8>) {
bytes.extend((self.writes.len() as u32).to_le_bytes());

pub fn encode(&self, writer: &mut impl BufWriter) {
writer.put_u32(self.writes.len() as u32);
for write in &self.writes {
write.encode(bytes);
write.encode(writer);
}
}
}
Loading