From 6d096524f67afb4f7b598d9e99bf5cefc65fa23b Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Mon, 11 Sep 2023 14:45:44 +0200 Subject: [PATCH] Remove `MessageLogIter`, use `commit_log::Iter` instead --- crates/core/src/db/message_log.rs | 75 ----------------------------- crates/core/src/db/relational_db.rs | 6 ++- 2 files changed, 4 insertions(+), 77 deletions(-) diff --git a/crates/core/src/db/message_log.rs b/crates/core/src/db/message_log.rs index 91259a1698e..9e0e6fce76b 100644 --- a/crates/core/src/db/message_log.rs +++ b/crates/core/src/db/message_log.rs @@ -239,18 +239,6 @@ impl MessageLog { self.root.clone() } - pub fn iter(&self) -> MessageLogIter { - self.iter_from(0) - } - - pub fn iter_from(&self, start_offset: u64) -> MessageLogIter { - MessageLogIter { - offset: start_offset, - message_log: self, - open_segment_file: None, - } - } - /// Obtains an iterator over all segments in the log, in the order they were /// created. /// @@ -306,19 +294,6 @@ impl MessageLog { fn open_segment_mut(&mut self) -> &mut Segment { self.segments.last_mut().expect("at least one segment must exist") } - - fn segment_for_offset(&self, offset: u64) -> Option { - let prev = self.segments[0]; - for segment in &self.segments { - if segment.min_offset > offset { - return Some(prev); - } - } - if offset <= self.open_segment_max_offset { - return Some(*self.segments.last().unwrap()); - } - None - } } /// A read-only view of an on-disk [`Segment`] of the [`MessageLog`]. @@ -439,56 +414,6 @@ impl Iterator for Segments { } } -pub struct MessageLogIter<'a> { - offset: u64, - message_log: &'a MessageLog, - open_segment_file: Option>, -} - -impl<'a> Iterator for MessageLogIter<'a> { - type Item = Vec; - - fn next(&mut self) -> Option { - let open_segment_file: &mut BufReader; - if let Some(f) = &mut self.open_segment_file { - open_segment_file = f; - } else { - let segment = self.message_log.segment_for_offset(self.offset).unwrap(); - let file = fs::OpenOptions::new() - .read(true) - .open(self.message_log.root.join(segment.name() + ".log")) - .unwrap(); - let file = BufReader::new(file); - self.open_segment_file = Some(file); - open_segment_file = self.open_segment_file.as_mut().unwrap(); - } - - // TODO: use offset to jump to the right spot in the file - // open_segment_file.seek_relative(byte_offset(self.offset)); - - let mut buf = [0; HEADER_SIZE]; - if let Err(err) = open_segment_file.read_exact(&mut buf) { - match err.kind() { - std::io::ErrorKind::UnexpectedEof => return None, - _ => panic!("MessageLogIter: {:?}", err), - } - }; - let message_len = u32::from_le_bytes(buf); - - let mut buf = vec![0; message_len as usize]; - if let Err(err) = open_segment_file.read_exact(&mut buf) { - match err.kind() { - std::io::ErrorKind::UnexpectedEof => return None, - _ => panic!("MessageLogIter: {:?}", err), - } - } - - self.offset += 1; - - Some(buf) - } -} - #[cfg(test)] mod tests { use super::MessageLog; diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index e249cf49c9e..07204e54953 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -8,6 +8,7 @@ use super::message_log::MessageLog; use super::ostorage::memory_object_db::MemoryObjectDB; use super::relational_operators::Relation; use crate::address::Address; +use crate::db::commit_log; use crate::db::db_metrics::{RDB_DELETE_BY_REL_TIME, RDB_DROP_TABLE_TIME, RDB_INSERT_TIME, RDB_ITER_TIME}; use crate::db::messages::commit::Commit; use crate::db::ostorage::hashmap_object_db::HashMapObjectDB; @@ -101,9 +102,10 @@ impl RelationalDB { if let Some(message_log) = &message_log { let message_log = message_log.lock().unwrap(); let max_offset = message_log.open_segment_max_offset; - for message in message_log.iter() { + for commit in commit_log::Iter::from(message_log.segments()) { + let commit = commit?; + segment_index += 1; - let (commit, _) = Commit::decode(message); last_hash = commit.parent_commit_hash; last_commit_offset = Some(commit.commit_offset); for transaction in commit.transactions {