Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 0 additions & 75 deletions crates/core/src/db/message_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,18 +239,6 @@ impl MessageLog {
self.root.clone()
}

pub fn iter(&self) -> MessageLogIter {
self.iter_from(0)
}

pub fn iter_from(&self, start_offset: u64) -> MessageLogIter {
MessageLogIter {
offset: start_offset,
message_log: self,
open_segment_file: None,
}
}

/// Obtains an iterator over all segments in the log, in the order they were
/// created.
///
Expand Down Expand Up @@ -306,19 +294,6 @@ impl MessageLog {
fn open_segment_mut(&mut self) -> &mut Segment {
self.segments.last_mut().expect("at least one segment must exist")
}

fn segment_for_offset(&self, offset: u64) -> Option<Segment> {
let prev = self.segments[0];
for segment in &self.segments {
if segment.min_offset > offset {
return Some(prev);
}
}
if offset <= self.open_segment_max_offset {
return Some(*self.segments.last().unwrap());
}
None
}
}

/// A read-only view of an on-disk [`Segment`] of the [`MessageLog`].
Expand Down Expand Up @@ -439,56 +414,6 @@ impl Iterator for Segments {
}
}

pub struct MessageLogIter<'a> {
offset: u64,
message_log: &'a MessageLog,
open_segment_file: Option<BufReader<File>>,
}

impl<'a> Iterator for MessageLogIter<'a> {
type Item = Vec<u8>;

fn next(&mut self) -> Option<Self::Item> {
let open_segment_file: &mut BufReader<File>;
if let Some(f) = &mut self.open_segment_file {
open_segment_file = f;
} else {
let segment = self.message_log.segment_for_offset(self.offset).unwrap();
let file = fs::OpenOptions::new()
.read(true)
.open(self.message_log.root.join(segment.name() + ".log"))
.unwrap();
let file = BufReader::new(file);
self.open_segment_file = Some(file);
open_segment_file = self.open_segment_file.as_mut().unwrap();
}

// TODO: use offset to jump to the right spot in the file
// open_segment_file.seek_relative(byte_offset(self.offset));

let mut buf = [0; HEADER_SIZE];
if let Err(err) = open_segment_file.read_exact(&mut buf) {
match err.kind() {
std::io::ErrorKind::UnexpectedEof => return None,
_ => panic!("MessageLogIter: {:?}", err),
}
};
let message_len = u32::from_le_bytes(buf);

let mut buf = vec![0; message_len as usize];
if let Err(err) = open_segment_file.read_exact(&mut buf) {
match err.kind() {
std::io::ErrorKind::UnexpectedEof => return None,
_ => panic!("MessageLogIter: {:?}", err),
}
}

self.offset += 1;

Some(buf)
}
}

#[cfg(test)]
mod tests {
use super::MessageLog;
Expand Down
6 changes: 4 additions & 2 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use super::message_log::MessageLog;
use super::ostorage::memory_object_db::MemoryObjectDB;
use super::relational_operators::Relation;
use crate::address::Address;
use crate::db::commit_log;
use crate::db::db_metrics::{RDB_DELETE_BY_REL_TIME, RDB_DROP_TABLE_TIME, RDB_INSERT_TIME, RDB_ITER_TIME};
use crate::db::messages::commit::Commit;
use crate::db::ostorage::hashmap_object_db::HashMapObjectDB;
Expand Down Expand Up @@ -101,9 +102,10 @@ impl RelationalDB {
if let Some(message_log) = &message_log {
let message_log = message_log.lock().unwrap();
let max_offset = message_log.open_segment_max_offset;
for message in message_log.iter() {
for commit in commit_log::Iter::from(message_log.segments()) {
let commit = commit?;

segment_index += 1;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aasoni Unsure why this is named segment_index when it counts commits (i.e. messages). If it is supposed to count commits, this could be further simplified via .enumerate().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this may be a question for @cloutiertyler, I've only rearranged things in this function and added debug logging. Not sure why the original naming decision

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay... I guess I'll just leave it like that, then. Although it looks to me like last_commit_offset would suffice to compute the percentage.

let (commit, _) = Commit::decode(message);
last_hash = commit.parent_commit_hash;
last_commit_offset = Some(commit.commit_offset);
for transaction in commit.transactions {
Expand Down