Skip to content

Commit 7c5d13c

Browse files
committed
core: Track the number of bytes read when iterating over the WAL
Mainly for providing additional error context when commit decoding fails.
1 parent 0a4c2f1 commit 7c5d13c

File tree

2 files changed

+35
-4
lines changed

2 files changed

+35
-4
lines changed

crates/core/src/db/commit_log.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -274,13 +274,27 @@ struct IterSegment {
274274
inner: message_log::IterSegment,
275275
}
276276

277+
impl IterSegment {
278+
fn bytes_read(&self) -> u64 {
279+
self.inner.bytes_read()
280+
}
281+
}
282+
277283
impl Iterator for IterSegment {
278284
type Item = io::Result<Commit>;
279285

280286
fn next(&mut self) -> Option<Self::Item> {
281287
let next = self.inner.next()?;
288+
289+
let ctx = || {
290+
format!(
291+
"Failed to decode commit in segment {:0>20} at byte offset: {}",
292+
self.inner.segment(),
293+
self.bytes_read()
294+
)
295+
};
282296
let io = |e| io::Error::new(io::ErrorKind::InvalidData, e);
283-
Some(next.and_then(|bytes| Commit::decode(&mut bytes.as_slice()).map_err(io)))
297+
Some(next.and_then(|bytes| Commit::decode(&mut bytes.as_slice()).with_context(ctx).map_err(io)))
284298
}
285299
}
286300

@@ -299,9 +313,8 @@ impl Iterator for Iter {
299313

300314
fn next(&mut self) -> Option<Self::Item> {
301315
loop {
302-
if let Some(mut commits) = self.commits.take() {
316+
if let Some(commits) = self.commits.as_mut() {
303317
if let Some(commit) = commits.next() {
304-
self.commits = Some(commits);
305318
return Some(commit);
306319
}
307320
}

crates/core/src/db/message_log.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,9 +335,10 @@ impl TryFrom<SegmentView> for IterSegment {
335335
type Error = io::Error;
336336

337337
fn try_from(view: SegmentView) -> Result<Self, Self::Error> {
338+
let segment = view.offset();
338339
File::try_from(view)
339340
.map(BufReader::new)
340-
.map(|file| IterSegment { file })
341+
.map(|file| IterSegment { segment, read: 0, file })
341342
}
342343
}
343344

@@ -354,10 +355,25 @@ impl TryFrom<SegmentView> for File {
354355
/// Created by [`SegmentView::try_iter`].
355356
#[must_use = "iterators are lazy and do nothing unless consumed"]
356357
pub struct IterSegment {
358+
segment: u64,
359+
read: u64,
357360
file: BufReader<File>,
358361
}
359362

360363
impl IterSegment {
364+
/// Return the id of the segment being iterated over.
365+
///
366+
/// The segment id is the `min_offset`, but that information is not
367+
/// meaningful here -- the value returned should be treated as opaque.
368+
pub fn segment(&self) -> u64 {
369+
self.segment
370+
}
371+
372+
/// Return the number of bytes read from the segment file so far.
373+
pub fn bytes_read(&self) -> u64 {
374+
self.read
375+
}
376+
361377
fn read_exact_or_none(&mut self, buf: &mut [u8]) -> Option<io::Result<()>> {
362378
match self.file.read_exact(buf) {
363379
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => None,
@@ -375,12 +391,14 @@ impl Iterator for IterSegment {
375391
if let Err(e) = self.read_exact_or_none(&mut buf)? {
376392
return Some(Err(e));
377393
}
394+
self.read += HEADER_SIZE as u64;
378395

379396
let message_len = u32::from_le_bytes(buf);
380397
let mut buf = vec![0; message_len as usize];
381398
if let Err(e) = self.read_exact_or_none(&mut buf)? {
382399
return Some(Err(e));
383400
}
401+
self.read += message_len as u64;
384402

385403
Some(Ok(buf))
386404
}

0 commit comments

Comments
 (0)