Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 27 additions & 23 deletions crates/core/src/db/commit_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use spacetimedb_lib::{

use std::io;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::{Mutex, MutexGuard};

#[derive(Clone)]
pub struct CommitLog {
Expand Down Expand Up @@ -56,33 +56,34 @@ impl CommitLog {
where
D: MutTxDatastore<RowId = RowId>,
{
if let Some(bytes) = self.generate_commit(tx_data, datastore) {
self.append_commit_bytes(&bytes).map(Some)
if let Some(mlog) = &self.mlog {
let mut mlog = mlog.lock().unwrap();
self.generate_commit(tx_data, datastore)
.as_deref()
.map(|bytes| self.append_commit_bytes(&mut mlog, bytes))
.transpose()
} else {
Ok(None)
}
}

// For testing -- doesn't require a `MutTxDatastore`, which is currently
// unused anyway.
fn append_commit_bytes(&self, commit: &[u8]) -> Result<usize, DBError> {
if let Some(mlog) = &self.mlog {
let mut mlog = mlog.lock().unwrap();
mlog.append(commit)?;
if self.fsync {
let offset = mlog.open_segment_max_offset;
// Sync the odb first, as the mlog depends on its data. This is
// not an atomicity guarantee, but the error context may help
// with forensics.
let mut odb = self.odb.lock().unwrap();
odb.sync_all()
.with_context(|| format!("Error syncing odb to disk. Log offset: {offset}"))?;
mlog.sync_all()
.with_context(|| format!("Error syncing mlog to disk. Log offset: {offset}"))?;
log::trace!("DATABASE: FSYNC");
} else {
mlog.flush()?;
}
fn append_commit_bytes(&self, mlog: &mut MutexGuard<'_, MessageLog>, commit: &[u8]) -> Result<usize, DBError> {
mlog.append(commit)?;
if self.fsync {
let offset = mlog.open_segment_max_offset;
// Sync the odb first, as the mlog depends on its data. This is
// not an atomicity guarantee, but the error context may help
// with forensics.
let mut odb = self.odb.lock().unwrap();
odb.sync_all()
.with_context(|| format!("Error syncing odb to disk. Log offset: {offset}"))?;
mlog.sync_all()
.with_context(|| format!("Error syncing mlog to disk. Log offset: {offset}"))?;
log::trace!("DATABASE: FSYNC");
} else {
mlog.flush()?;
}
Ok(commit.len())
}
Expand Down Expand Up @@ -350,8 +351,11 @@ mod tests {
true, // fsync
);

for _ in 0..TOTAL_MESSAGES {
log.append_commit_bytes(&commit_bytes).unwrap();
{
let mut guard = log.mlog.as_ref().unwrap().lock().unwrap();
for _ in 0..TOTAL_MESSAGES {
log.append_commit_bytes(&mut guard, &commit_bytes).unwrap();
}
}

let view = CommitLogView::from(&log);
Expand Down