From c881d34eff1eb8acfbf09f2be858a3df1fa37639 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 29 Jul 2025 18:02:44 -0400 Subject: [PATCH] [Parquet] Refactor InMemoryRowGroup to separate CPU and IO --- parquet/src/arrow/async_reader/mod.rs | 86 +++++++++++++++++++++------ 1 file changed, 68 insertions(+), 18 deletions(-) diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 611d6999e07e..7a293811094e 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -886,6 +886,15 @@ struct InMemoryRowGroup<'a> { metadata: &'a ParquetMetaData, } +/// What ranges to fetch for the columns in this row group +#[derive(Debug)] +struct FetchRanges { + /// The byte ranges to fetch + ranges: Vec>, + /// If `Some`, the start offsets of each page for each column chunk + page_start_offsets: Option>>, +} + impl InMemoryRowGroup<'_> { /// Fetches any additional column data specified in `projection` that is not already /// present in `self.column_chunks`. @@ -898,13 +907,32 @@ impl InMemoryRowGroup<'_> { projection: &ProjectionMask, selection: Option<&RowSelection>, ) -> Result<()> { + // Figure out what ranges to fetch + let FetchRanges { + ranges, + page_start_offsets, + } = self.fetch_ranges(projection, selection); + // do the actual fetch + let chunk_data = input.get_byte_ranges(ranges).await?.into_iter(); + // update our in memory buffers (self.column_chunks) with the fetched data + self.fill_column_chunks(projection, page_start_offsets, chunk_data); + Ok(()) + } + + /// Returns the byte ranges to fetch for the columns specified in + /// `projection` and `selection`. + fn fetch_ranges( + &self, + projection: &ProjectionMask, + selection: Option<&RowSelection>, + ) -> FetchRanges { let metadata = self.metadata.row_group(self.row_group_idx); if let Some((selection, offset_index)) = selection.zip(self.offset_index) { // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the // `RowSelection` let mut page_start_offsets: Vec> = vec![]; - let fetch_ranges = self + let ranges = self .column_chunks .iter() .zip(metadata.columns()) @@ -930,8 +958,46 @@ impl InMemoryRowGroup<'_> { ranges }) .collect(); + FetchRanges { + ranges, + page_start_offsets: Some(page_start_offsets), + } + } else { + let ranges = self + .column_chunks + .iter() + .enumerate() + .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx)) + .map(|(idx, _chunk)| { + let column = metadata.column(idx); + let (start, length) = column.byte_range(); + start..(start + length) + }) + .collect(); + FetchRanges { + ranges, + page_start_offsets: None, + } + } + } - let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); + /// Fills in `self.column_chunks` with the data fetched from `chunk_data`. + /// + /// This function **must** be called with the data from the ranges returned by + /// `fetch_ranges` and the corresponding page_start_offsets, with the exact same and `selection`. + fn fill_column_chunks( + &mut self, + projection: &ProjectionMask, + page_start_offsets: Option>>, + chunk_data: I, + ) where + I: IntoIterator, + { + let mut chunk_data = chunk_data.into_iter(); + let metadata = self.metadata.row_group(self.row_group_idx); + if let Some(page_start_offsets) = page_start_offsets { + // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the + // `RowSelection` let mut page_start_offsets = page_start_offsets.into_iter(); for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { @@ -956,20 +1022,6 @@ impl InMemoryRowGroup<'_> { } } } else { - let fetch_ranges = self - .column_chunks - .iter() - .enumerate() - .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx)) - .map(|(idx, _chunk)| { - let column = metadata.column(idx); - let (start, length) = column.byte_range(); - start..(start + length) - }) - .collect(); - - let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); - for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { if chunk.is_some() || !projection.leaf_included(idx) { continue; @@ -983,8 +1035,6 @@ impl InMemoryRowGroup<'_> { } } } - - Ok(()) } }