Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 68 additions & 18 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,15 @@ struct InMemoryRowGroup<'a> {
metadata: &'a ParquetMetaData,
}

/// What ranges to fetch for the columns in this row group
#[derive(Debug)]
struct FetchRanges {
/// The byte ranges to fetch
ranges: Vec<Range<u64>>,
/// If `Some`, the start offsets of each page for each column chunk
page_start_offsets: Option<Vec<Vec<u64>>>,
}

impl InMemoryRowGroup<'_> {
/// Fetches any additional column data specified in `projection` that is not already
/// present in `self.column_chunks`.
Expand All @@ -898,13 +907,32 @@ impl InMemoryRowGroup<'_> {
projection: &ProjectionMask,
selection: Option<&RowSelection>,
) -> Result<()> {
// Figure out what ranges to fetch
let FetchRanges {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just broke up fetch into three functions

ranges,
page_start_offsets,
} = self.fetch_ranges(projection, selection);
// do the actual fetch
let chunk_data = input.get_byte_ranges(ranges).await?.into_iter();
// update our in memory buffers (self.column_chunks) with the fetched data
self.fill_column_chunks(projection, page_start_offsets, chunk_data);
Ok(())
}

/// Returns the byte ranges to fetch for the columns specified in
/// `projection` and `selection`.
fn fetch_ranges(
&self,
projection: &ProjectionMask,
selection: Option<&RowSelection>,
) -> FetchRanges {
let metadata = self.metadata.row_group(self.row_group_idx);
if let Some((selection, offset_index)) = selection.zip(self.offset_index) {
// If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
// `RowSelection`
let mut page_start_offsets: Vec<Vec<u64>> = vec![];

let fetch_ranges = self
let ranges = self
.column_chunks
.iter()
.zip(metadata.columns())
Expand All @@ -930,8 +958,46 @@ impl InMemoryRowGroup<'_> {
ranges
})
.collect();
FetchRanges {
ranges,
page_start_offsets: Some(page_start_offsets),
}
} else {
let ranges = self
.column_chunks
.iter()
.enumerate()
.filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
.map(|(idx, _chunk)| {
let column = metadata.column(idx);
let (start, length) = column.byte_range();
start..(start + length)
})
.collect();
FetchRanges {
ranges,
page_start_offsets: None,
}
}
}

let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
/// Fills in `self.column_chunks` with the data fetched from `chunk_data`.
///
/// This function **must** be called with the data from the ranges returned by
/// `fetch_ranges` and the corresponding page_start_offsets, with the exact same and `selection`.
fn fill_column_chunks<I>(
&mut self,
projection: &ProjectionMask,
page_start_offsets: Option<Vec<Vec<u64>>>,
chunk_data: I,
) where
I: IntoIterator<Item = Bytes>,
{
let mut chunk_data = chunk_data.into_iter();
let metadata = self.metadata.row_group(self.row_group_idx);
if let Some(page_start_offsets) = page_start_offsets {
// If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
// `RowSelection`
let mut page_start_offsets = page_start_offsets.into_iter();

for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
Expand All @@ -956,20 +1022,6 @@ impl InMemoryRowGroup<'_> {
}
}
} else {
let fetch_ranges = self
.column_chunks
.iter()
.enumerate()
.filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
.map(|(idx, _chunk)| {
let column = metadata.column(idx);
let (start, length) = column.byte_range();
start..(start + length)
})
.collect();

let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();

for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
if chunk.is_some() || !projection.leaf_included(idx) {
continue;
Expand All @@ -983,8 +1035,6 @@ impl InMemoryRowGroup<'_> {
}
}
}

Ok(())
}
}

Expand Down
Loading