Skip to content

Commit e8aed85

Browse files
authored
core: Provide read access to commit/message log and odb (#265)
* core: Provide read access to commit/message log and odb This is a first approximation to provide data access for Kafka-style replication. It punts on notifications of segment rotation (which could also be solved via filesystem events). ObjectDB access is fairly crude, and will likely require it's own replication subsystem. * fixup! Merge remote-tracking branch 'origin/master' into kim/log-access * Add some tests Also make max segment size configurable, so tests don't have to write >= 1GiB worth of data. * Abide to the Rust naming conventions * Add test for commit iter * Remove `MessageLogIter`, use `commit_log::Iter` instead (#272)
1 parent 3f5f97f commit e8aed85

File tree

3 files changed

+583
-122
lines changed

3 files changed

+583
-122
lines changed

crates/core/src/db/commit_log.rs

Lines changed: 260 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use super::{
22
datastore::traits::{MutTxDatastore, TxData},
3-
message_log::MessageLog,
3+
message_log::{self, MessageLog},
44
messages::commit::Commit,
55
ostorage::ObjectDB,
66
};
@@ -14,7 +14,13 @@ use crate::{
1414
},
1515
error::DBError,
1616
};
17-
use spacetimedb_lib::hash::hash_bytes;
17+
18+
use spacetimedb_lib::{
19+
hash::{hash_bytes, Hash},
20+
DataKey,
21+
};
22+
23+
use std::io;
1824
use std::sync::Arc;
1925
use std::sync::Mutex;
2026

@@ -50,24 +56,30 @@ impl CommitLog {
5056
D: MutTxDatastore<RowId = RowId>,
5157
{
5258
if let Some(bytes) = self.generate_commit(tx_data, datastore) {
53-
if let Some(mlog) = &self.mlog {
54-
let mut mlog = mlog.lock().unwrap();
55-
mlog.append(&bytes)?;
56-
if self.fsync {
57-
mlog.sync_all()?;
58-
let mut odb = self.odb.lock().unwrap();
59-
odb.sync_all()?;
60-
log::trace!("DATABASE: FSYNC");
61-
} else {
62-
mlog.flush()?;
63-
}
64-
}
65-
Ok(Some(bytes.len()))
59+
self.append_commit_bytes(&bytes).map(Some)
6660
} else {
6761
Ok(None)
6862
}
6963
}
7064

65+
// For testing -- doesn't require a `MutTxDatastore`, which is currently
66+
// unused anyway.
67+
fn append_commit_bytes(&self, commit: &[u8]) -> Result<usize, DBError> {
68+
if let Some(mlog) = &self.mlog {
69+
let mut mlog = mlog.lock().unwrap();
70+
mlog.append(commit)?;
71+
if self.fsync {
72+
mlog.sync_all()?;
73+
let mut odb = self.odb.lock().unwrap();
74+
odb.sync_all()?;
75+
log::trace!("DATABASE: FSYNC");
76+
} else {
77+
mlog.flush()?;
78+
}
79+
}
80+
Ok(commit.len())
81+
}
82+
7183
fn generate_commit<D: MutTxDatastore<RowId = RowId>>(&self, tx_data: &TxData, _datastore: &D) -> Option<Vec<u8>> {
7284
// We are not creating a commit for empty transactions.
7385
// The reason for this is that empty transactions get encoded as 0 bytes,
@@ -121,3 +133,236 @@ impl CommitLog {
121133
}
122134
}
123135
}
136+
137+
/// A read-only view of a [`CommitLog`].
138+
pub struct CommitLogView {
139+
mlog: Option<Arc<Mutex<MessageLog>>>,
140+
odb: Arc<Mutex<Box<dyn ObjectDB + Send>>>,
141+
}
142+
143+
impl CommitLogView {
144+
/// Obtain an iterator over a snapshot of the raw message log segments.
145+
///
146+
/// See also: [`MessageLog::segments`]
147+
pub fn message_log_segments(&self) -> message_log::Segments {
148+
self.message_log_segments_from(0)
149+
}
150+
151+
/// Obtain an iterator over a snapshot of the raw message log segments
152+
/// containing messages equal to or newer than `offset`.
153+
///
154+
/// See [`MessageLog::segments_from`] for more information.
155+
pub fn message_log_segments_from(&self, offset: u64) -> message_log::Segments {
156+
if let Some(mlog) = &self.mlog {
157+
let mlog = mlog.lock().unwrap();
158+
mlog.segments_from(offset)
159+
} else {
160+
message_log::Segments::empty()
161+
}
162+
}
163+
164+
/// Obtain an iterator over the [`Commit`]s in the log.
165+
///
166+
/// The iterator represents a snapshot of the log.
167+
pub fn iter(&self) -> Iter {
168+
self.iter_from(0)
169+
}
170+
171+
/// Obtain an iterator over the [`Commit`]s in the log, starting at `offset`.
172+
///
173+
/// The iterator represents a snapshot of the log.
174+
///
175+
/// Note that [`Commit`]s with an offset _smaller_ than `offset` may be
176+
/// yielded if the offset doesn't fall on a segment boundary, due to the
177+
/// lack of slicing support.
178+
///
179+
/// See [`MessageLog::segments_from`] for more information.
180+
pub fn iter_from(&self, offset: u64) -> Iter {
181+
self.message_log_segments_from(offset).into()
182+
}
183+
184+
/// Obtain an iterator over the large objects in [`Commit`], if any.
185+
///
186+
/// Large objects are stored in the [`ObjectDB`], and are referenced from
187+
/// the transactions in a [`Commit`].
188+
///
189+
/// The iterator attempts to read each large object in turn, yielding an
190+
/// [`io::Error`] with kind [`io::ErrorKind::NotFound`] if the object was
191+
/// not found.
192+
//
193+
// TODO(kim): We probably want a more efficient way to stream the contents
194+
// of the ODB over the network for replication purposes.
195+
pub fn commit_objects<'a>(&self, commit: &'a Commit) -> impl Iterator<Item = io::Result<bytes::Bytes>> + 'a {
196+
fn hashes(tx: &Arc<Transaction>) -> impl Iterator<Item = Hash> + '_ {
197+
tx.writes.iter().filter_map(|write| {
198+
if let DataKey::Hash(h) = write.data_key {
199+
Some(h)
200+
} else {
201+
None
202+
}
203+
})
204+
}
205+
206+
let odb = self.odb.clone();
207+
commit.transactions.iter().flat_map(hashes).map(move |hash| {
208+
let odb = odb.lock().unwrap();
209+
odb.get(hash)
210+
.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, format!("Missing object: {hash}")))
211+
})
212+
}
213+
}
214+
215+
impl From<&CommitLog> for CommitLogView {
216+
fn from(log: &CommitLog) -> Self {
217+
Self {
218+
mlog: log.mlog.clone(),
219+
odb: log.odb.clone(),
220+
}
221+
}
222+
}
223+
224+
#[must_use = "iterators are lazy and do nothing unless consumed"]
225+
struct IterSegment {
226+
inner: message_log::IterSegment,
227+
}
228+
229+
impl Iterator for IterSegment {
230+
type Item = io::Result<Commit>;
231+
232+
fn next(&mut self) -> Option<Self::Item> {
233+
let next = self.inner.next()?;
234+
Some(next.map(|bytes| {
235+
// It seems very improbable that `decode` is infallible...
236+
let (commit, _) = Commit::decode(bytes);
237+
commit
238+
}))
239+
}
240+
}
241+
242+
/// Iterator over a [`CommitLogView`], yielding [`Commit`]s.
243+
///
244+
/// Created by [`CommitLogView::iter`] and [`CommitLogView::iter_from`]
245+
/// respectively.
246+
#[must_use = "iterators are lazy and do nothing unless consumed"]
247+
pub struct Iter {
248+
commits: Option<IterSegment>,
249+
segments: message_log::Segments,
250+
}
251+
252+
impl Iterator for Iter {
253+
type Item = io::Result<Commit>;
254+
255+
fn next(&mut self) -> Option<Self::Item> {
256+
loop {
257+
if let Some(mut commits) = self.commits.take() {
258+
if let Some(commit) = commits.next() {
259+
self.commits = Some(commits);
260+
return Some(commit);
261+
}
262+
}
263+
264+
let segment = self.segments.next()?;
265+
match segment.try_into_iter() {
266+
Err(e) => return Some(Err(e)),
267+
Ok(inner) => {
268+
self.commits = Some(IterSegment { inner });
269+
}
270+
}
271+
}
272+
}
273+
}
274+
275+
impl From<message_log::Segments> for Iter {
276+
fn from(segments: message_log::Segments) -> Self {
277+
Self {
278+
commits: None,
279+
segments,
280+
}
281+
}
282+
}
283+
284+
#[cfg(test)]
285+
mod tests {
286+
use super::*;
287+
288+
use spacetimedb_lib::data_key::InlineData;
289+
use tempdir::TempDir;
290+
291+
use crate::db::ostorage::memory_object_db::MemoryObjectDB;
292+
293+
#[test]
294+
fn test_iter_commits() {
295+
let tmp = TempDir::new("commit_log_test").unwrap();
296+
297+
let data_key = DataKey::Data(InlineData::from_bytes(b"asdf").unwrap());
298+
let tx = Transaction {
299+
writes: vec![
300+
Write {
301+
operation: Operation::Insert,
302+
set_id: 42,
303+
data_key,
304+
},
305+
Write {
306+
operation: Operation::Delete,
307+
set_id: 42,
308+
data_key,
309+
},
310+
],
311+
};
312+
313+
// The iterator doesn't verify integrity of commits, so we can just
314+
// write the same one repeatedly.
315+
let commit = Commit {
316+
parent_commit_hash: None,
317+
commit_offset: 0,
318+
min_tx_offset: 0,
319+
transactions: vec![Arc::new(tx)],
320+
};
321+
let mut commit_bytes = Vec::new();
322+
commit.encode(&mut commit_bytes);
323+
324+
const COMMITS_PER_SEGMENT: usize = 10_000;
325+
const TOTAL_MESSAGES: usize = (COMMITS_PER_SEGMENT * 3) - 1;
326+
let segment_size: usize = COMMITS_PER_SEGMENT * (commit_bytes.len() + 4);
327+
328+
let mlog = message_log::MessageLog::options()
329+
.max_segment_size(segment_size as u64)
330+
.open(tmp.path())
331+
.unwrap();
332+
let odb = MemoryObjectDB::default();
333+
334+
let log = CommitLog::new(
335+
Some(Arc::new(Mutex::new(mlog))),
336+
Arc::new(Mutex::new(Box::new(odb))),
337+
Commit {
338+
parent_commit_hash: None,
339+
commit_offset: 0,
340+
min_tx_offset: 0,
341+
transactions: Vec::new(),
342+
},
343+
true, // fsync
344+
);
345+
346+
for _ in 0..TOTAL_MESSAGES {
347+
log.append_commit_bytes(&commit_bytes).unwrap();
348+
}
349+
350+
let view = CommitLogView::from(&log);
351+
let commits = view.iter().map(Result::unwrap).count();
352+
assert_eq!(TOTAL_MESSAGES, commits);
353+
354+
let commits = view.iter_from(1_000_000).map(Result::unwrap).count();
355+
assert_eq!(0, commits);
356+
357+
// No slicing yet, so offsets on segment boundaries yield an additional
358+
// COMMITS_PER_SEGMENT.
359+
let commits = view.iter_from(20_001).map(Result::unwrap).count();
360+
assert_eq!(9999, commits);
361+
362+
let commits = view.iter_from(10_001).map(Result::unwrap).count();
363+
assert_eq!(19_999, commits);
364+
365+
let commits = view.iter_from(10_000).map(Result::unwrap).count();
366+
assert_eq!(29_999, commits);
367+
}
368+
}

0 commit comments

Comments
 (0)