Skip to content

Commit cb32c83

Browse files
committed
core: Fix commit log concurrency
Previously, constructing the commit payload to append to the message log was done without holding the lock on the latter. This meant that commits could be written to the log out-of-order. Indeed, this could be observed on deployed databases by virtue of verifying the hash chain (the parent hash is computed in `generate_commit`). To fix this, the lock is now acquired immediately and held until the message is written (and potentially fsync'ed).
1 parent 3838c89 commit cb32c83

File tree

1 file changed

+27
-23
lines changed

1 file changed

+27
-23
lines changed

crates/core/src/db/commit_log.rs

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use spacetimedb_lib::{
2323

2424
use std::io;
2525
use std::sync::Arc;
26-
use std::sync::Mutex;
26+
use std::sync::{Mutex, MutexGuard};
2727

2828
#[derive(Clone)]
2929
pub struct CommitLog {
@@ -56,33 +56,34 @@ impl CommitLog {
5656
where
5757
D: MutTxDatastore<RowId = RowId>,
5858
{
59-
if let Some(bytes) = self.generate_commit(tx_data, datastore) {
60-
self.append_commit_bytes(&bytes).map(Some)
59+
if let Some(mlog) = &self.mlog {
60+
let mut mlog = mlog.lock().unwrap();
61+
self.generate_commit(tx_data, datastore)
62+
.as_deref()
63+
.map(|bytes| self.append_commit_bytes(&mut mlog, bytes))
64+
.transpose()
6165
} else {
6266
Ok(None)
6367
}
6468
}
6569

6670
// For testing -- doesn't require a `MutTxDatastore`, which is currently
6771
// unused anyway.
68-
fn append_commit_bytes(&self, commit: &[u8]) -> Result<usize, DBError> {
69-
if let Some(mlog) = &self.mlog {
70-
let mut mlog = mlog.lock().unwrap();
71-
mlog.append(commit)?;
72-
if self.fsync {
73-
let offset = mlog.open_segment_max_offset;
74-
// Sync the odb first, as the mlog depends on its data. This is
75-
// not an atomicity guarantee, but the error context may help
76-
// with forensics.
77-
let mut odb = self.odb.lock().unwrap();
78-
odb.sync_all()
79-
.with_context(|| format!("Error syncing odb to disk. Log offset: {offset}"))?;
80-
mlog.sync_all()
81-
.with_context(|| format!("Error syncing mlog to disk. Log offset: {offset}"))?;
82-
log::trace!("DATABASE: FSYNC");
83-
} else {
84-
mlog.flush()?;
85-
}
72+
fn append_commit_bytes(&self, mlog: &mut MutexGuard<'_, MessageLog>, commit: &[u8]) -> Result<usize, DBError> {
73+
mlog.append(commit)?;
74+
if self.fsync {
75+
let offset = mlog.open_segment_max_offset;
76+
// Sync the odb first, as the mlog depends on its data. This is
77+
// not an atomicity guarantee, but the error context may help
78+
// with forensics.
79+
let mut odb = self.odb.lock().unwrap();
80+
odb.sync_all()
81+
.with_context(|| format!("Error syncing odb to disk. Log offset: {offset}"))?;
82+
mlog.sync_all()
83+
.with_context(|| format!("Error syncing mlog to disk. Log offset: {offset}"))?;
84+
log::trace!("DATABASE: FSYNC");
85+
} else {
86+
mlog.flush()?;
8687
}
8788
Ok(commit.len())
8889
}
@@ -350,8 +351,11 @@ mod tests {
350351
true, // fsync
351352
);
352353

353-
for _ in 0..TOTAL_MESSAGES {
354-
log.append_commit_bytes(&commit_bytes).unwrap();
354+
{
355+
let mut guard = log.mlog.as_ref().unwrap().lock().unwrap();
356+
for _ in 0..TOTAL_MESSAGES {
357+
log.append_commit_bytes(&mut guard, &commit_bytes).unwrap();
358+
}
355359
}
356360

357361
let view = CommitLogView::from(&log);

0 commit comments

Comments
 (0)