From 59f82a9947f1a712300b24e787185b2eb674ca85 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Tue, 13 May 2025 17:41:30 +0800 Subject: [PATCH 1/6] Add outline for evaluation batch --- parquet/src/arrow/arrow_reader/mod.rs | 70 +++++++++++++++++++++++++-- 1 file changed, 67 insertions(+), 3 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 2f670a64e108..b897a7aa19ab 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -18,15 +18,17 @@ //! Contains reader which reads parquet data into arrow [`RecordBatch`] use arrow_array::cast::AsArray; -use arrow_array::Array; +use arrow_array::{Array, ArrayRef, BooleanArray}; use arrow_array::{RecordBatch, RecordBatchReader}; use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef}; -use arrow_select::filter::prep_null_mask_filter; +use arrow_select::filter::{filter, filter_record_batch, prep_null_mask_filter}; pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; pub use selection::{RowSelection, RowSelector}; use std::collections::VecDeque; use std::sync::Arc; - +use arrow::compute::and; +use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder}; +use arrow_select::concat::concat; pub use crate::arrow::array_reader::RowGroups; use crate::arrow::array_reader::{build_array_reader, ArrayReader}; use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField}; @@ -1015,6 +1017,68 @@ pub(crate) fn evaluate_predicate( }) } +fn evaluate_predicate_batch( + batch_size: usize, + mut filter_reader: ParquetRecordBatchReader, + mut predicates: Vec>, +) -> Result { + let mut passing = Vec::with_capacity(batch_size); + let mut total_selected = 0; + let mut batches = Vec::new(); + while total_selected < batch_size { + match filter_reader.next() { + Some(Ok(batch)) => { + // Apply predicates sequentially and combine with AND + let mut combined_mask: Option = None; + + for predicate in predicates.iter_mut() { + let mask = predicate.evaluate(batch.clone())?; + if mask.len() != batch.num_rows() { + return Err(ArrowError::ComputeError(format!( + "Predicate returned {} rows, expected {}", + mask.len(), + batch.num_rows() + ))); + } + combined_mask = match combined_mask { + Some(prev) => Some(and(&prev, &mask)?), + None => Some(mask), + }; + } + + if let Some(mask) = combined_mask { + batches.push(filter_record_batch( + &batch, + &mask)); + total_selected += mask.true_count(); + passing.push(mask); + } else { + let len = batch.num_rows(); + let buffer = BooleanBuffer::new_set(len); + let mask = BooleanArray::new(buffer, None); + total_selected += len; + passing.push(mask); + } + } + Some(Err(e)) => return Err(e), + None => break, + } + } + let arrays: Vec = passing + .into_iter() + .map(|b| Arc::new(b) as ArrayRef) + .collect(); + + let combined = concat(&arrays).unwrap(); + let boolean_combined = combined + .as_any() + .downcast_ref::() + .unwrap() + .clone(); + + Ok(boolean_combined) +} + #[cfg(test)] mod tests { use std::cmp::min; From 97f9099ff05e4ae912a936f505d6378c3b9d1975 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Wed, 14 May 2025 17:09:31 +0800 Subject: [PATCH 2/6] Draft poc for unified filter decoder --- parquet/src/arrow/arrow_reader/mod.rs | 538 ++++++++++++++++++-------- parquet/src/arrow/async_reader/mod.rs | 80 ++-- parquet/src/arrow/mod.rs | 20 + 3 files changed, 463 insertions(+), 175 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index b897a7aa19ab..573187bfc5db 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -26,7 +26,6 @@ pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; pub use selection::{RowSelection, RowSelector}; use std::collections::VecDeque; use std::sync::Arc; -use arrow::compute::and; use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder}; use arrow_select::concat::concat; pub use crate::arrow::array_reader::RowGroups; @@ -682,7 +681,37 @@ impl ParquetRecordBatchReaderBuilder { let mut filter = self.filter; let mut selection = self.selection; + let mut projection = self.projection; + + + let predicate_projection = filter + .as_mut() + .map(|filter| { + filter + .predicates + .iter_mut() + .map(|p| p.projection().clone()) + .reduce(|mut acc, p| { + acc.union(&p); + acc + }) + }) + .flatten(); + + let projection_to_cache = predicate_projection.as_ref().map(|p| { + let mut p = p.clone(); + p.intersect(&projection); + p + }); + let project_exclude_filter = predicate_projection.as_ref().map(|p| { + let mut rest = projection.clone(); + rest.subtract(p); + rest + }).or_else(|| Some(projection.clone())); + + + let mut filter_readers = vec![]; if let Some(filter) = filter.as_mut() { for predicate in filter.predicates.iter_mut() { if !selects_any(selection.as_ref()) { @@ -692,26 +721,28 @@ impl ParquetRecordBatchReaderBuilder { let array_reader = build_array_reader(self.fields.as_deref(), predicate.projection(), &reader)?; - selection = Some(evaluate_predicate( - batch_size, - array_reader, - selection, - predicate.as_mut(), - )?); + filter_readers.push(array_reader); } } - let array_reader = build_array_reader(self.fields.as_deref(), &self.projection, &reader)?; - // If selection is empty, truncate if !selects_any(selection.as_ref()) { selection = Some(RowSelection::from(vec![])); } - Ok(ParquetRecordBatchReader::new( + + let filter_reader = build_array_reader(self.fields.as_deref(), predicate_projection.as_ref().unwrap(), &reader)?; + + + selection = apply_range(selection, reader.num_rows(), self.offset, self.limit); + + Ok(ParquetRecordBatchReader::new( batch_size, - array_reader, - apply_range(selection, reader.num_rows(), self.offset, self.limit), + build_array_reader(self.fields.as_deref(), project_exclude_filter.as_ref().unwrap(), &reader)?, + filter_readers, + filter, + projection_to_cache, + selection, )) } } @@ -793,66 +824,194 @@ impl PageIterator for ReaderPageIterator {} pub struct ParquetRecordBatchReader { batch_size: usize, array_reader: Box, + filter_readers: Vec>, + row_filter: Option, schema: SchemaRef, + cached_mask: Option, selection: Option>, } + +/// Take the next selection from the selection queue, and return the selection +/// whose selected row count is to_select or less (if input selection is exhausted). +fn take_next_selection( + selection: &mut VecDeque, + to_select: usize, +) -> Option { + let mut current_selected = 0; + let mut rt = Vec::new(); + while let Some(front) = selection.pop_front() { + if front.skip { + rt.push(front); + continue; + } + + if current_selected + front.row_count <= to_select { + rt.push(front); + current_selected += front.row_count; + } else { + let select = to_select - current_selected; + let remaining = front.row_count - select; + rt.push(RowSelector::select(select)); + selection.push_front(RowSelector::select(remaining)); + + return Some(rt.into()); + } + } + if !rt.is_empty() { + return Some(rt.into()); + } + None +} + + + impl Iterator for ParquetRecordBatchReader { type Item = Result; fn next(&mut self) -> Option { - let mut read_records = 0; - match self.selection.as_mut() { - Some(selection) => { - while read_records < self.batch_size && !selection.is_empty() { - let front = selection.pop_front().unwrap(); - if front.skip { - let skipped = match self.array_reader.skip_records(front.row_count) { - Ok(skipped) => skipped, - Err(e) => return Some(Err(e.into())), - }; - if skipped != front.row_count { - return Some(Err(general_err!( - "failed to skip rows, expected {}, got {}", - front.row_count, - skipped - ) - .into())); + let mut current_selected = 0; + + let mut current_selections: Vec = vec![]; + while current_selected < self.batch_size { + let Some(selection) = self.selection.as_mut() else { + break; + }; + + let Some(mut raw_sel) = take_next_selection(selection, self.batch_size) else { + break; + }; + + let selection: Result = match &mut self.row_filter { + None => Ok(raw_sel), + Some(filter) => { + debug_assert_eq!( + self.filter_readers.len(), + filter.predicates.len(), + "predicate readers and predicates should have the same length" + ); + + for (predicate, reader) in filter + .predicates + .iter_mut() + .zip(self.filter_readers.iter_mut()) + { + let array = read_selection(reader.as_mut(), &raw_sel); + let batch = RecordBatch::from(array.unwrap().as_struct_opt().ok_or_else(|| { + general_err!("Struct array reader should return struct array") + }).unwrap()); + let input_rows = batch.num_rows(); + let predicate_filter = predicate.evaluate(batch).unwrap(); + if predicate_filter.len() != input_rows { + return Some(Err(ArrowError::ParquetError(format!( + "ArrowPredicate predicate returned {} rows, expected {input_rows}", + predicate_filter.len() + )))); } - continue; + let predicate_filter = match predicate_filter.null_count() { + 0 => predicate_filter, + _ => prep_null_mask_filter(&predicate_filter), + }; + let raw = RowSelection::from_filters(&[predicate_filter]); + raw_sel = raw_sel.and_then(&raw); } + Ok(raw_sel) + } + }; - //Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader. - //Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669 - if front.row_count == 0 { - continue; - } + current_selected += selection.as_ref().unwrap().row_count(); + current_selections.push(selection.unwrap()); + } - // try to read record - let need_read = self.batch_size - read_records; - let to_read = match front.row_count.checked_sub(need_read) { - Some(remaining) if remaining != 0 => { - // if page row count less than batch_size we must set batch size to page row count. - // add check avoid dead loop - selection.push_front(RowSelector::select(remaining)); - need_read - } - _ => front.row_count, + for selection in &mut current_selections { + for selector in selection.iter() { + if selector.skip { + let skipped = match self.array_reader.skip_records(selector.row_count) { + Ok(skipped) => skipped, + Err(e) => return Some(Err(e.into())), }; - match self.array_reader.read_records(to_read) { - Ok(0) => break, - Ok(rec) => read_records += rec, - Err(error) => return Some(Err(error.into())), + + if skipped != selector.row_count { + return Some(Err(general_err!( + "failed to skip rows, expected {}, got {}", + selector.row_count, + skipped + ) + .into())); } + continue; } - } - None => { - if let Err(error) = self.array_reader.read_records(self.batch_size) { - return Some(Err(error.into())); + + let read = match self.array_reader.read_records(selector.row_count) { + Ok(read) => read, + Err(e) => return Some(Err(e.into())), + }; + + if read != selector.row_count { + return Some(Err(general_err!( + "failed to read rows, expected {}, got {}", + selector.row_count, + read + ) + .into())); } + } - }; + } + + // let mut read_records = 0; + // match self.selection.as_mut() { + // Some(selection) => { + // while read_records < self.batch_size && !selection.is_empty() { + // let front = selection.pop_front().unwrap(); + // if front.skip { + // let skipped = match self.array_reader.skip_records(front.row_count) { + // Ok(skipped) => skipped, + // Err(e) => return Some(Err(e.into())), + // }; + // + // if skipped != front.row_count { + // return Some(Err(general_err!( + // "failed to skip rows, expected {}, got {}", + // front.row_count, + // skipped + // ) + // .into())); + // } + // continue; + // } + // + // //Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader. + // //Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669 + // if front.row_count == 0 { + // continue; + // } + // + // // try to read record + // let need_read = self.batch_size - read_records; + // let to_read = match front.row_count.checked_sub(need_read) { + // Some(remaining) if remaining != 0 => { + // // if page row count less than batch_size we must set batch size to page row count. + // // add check avoid dead loop + // selection.push_front(RowSelector::select(remaining)); + // need_read + // } + // _ => front.row_count, + // }; + // match self.array_reader.read_records(to_read) { + // Ok(0) => break, + // Ok(rec) => read_records += rec, + // Err(error) => return Some(Err(error.into())), + // } + // } + // } + // None => { + // if let Err(error) = self.array_reader.read_records(self.batch_size) { + // return Some(Err(error.into())); + // } + // } + // }; match self.array_reader.consume_batch() { Err(error) => Some(Err(error.into())), @@ -908,6 +1067,9 @@ impl ParquetRecordBatchReader { Ok(Self { batch_size, array_reader, + filter_readers: vec![], + row_filter: None, + cached_mask: None, schema: Arc::new(Schema::new(levels.fields.clone())), selection: selection.map(|s| s.trim().into()), }) @@ -918,7 +1080,14 @@ impl ParquetRecordBatchReader { /// all rows will be returned pub(crate) fn new( batch_size: usize, + // finial project columns exclude the filter columns array_reader: Box, + // filter columns reader + filter_readers: Vec>, + // row filters + row_filter: Option, + // Cached project mask + cached_mask: Option, selection: Option, ) -> Self { let schema = match array_reader.get_data_type() { @@ -929,10 +1098,57 @@ impl ParquetRecordBatchReader { Self { batch_size, array_reader, + filter_readers, + row_filter, schema: Arc::new(schema), + cached_mask, selection: selection.map(|s| s.trim().into()), } } + + + /// Take a selection, and return the new selection where the rows are filtered by the predicate. + fn build_predicate_filter( + &mut self, + mut selection: RowSelection, + ) -> Result { + match &mut self.row_filter { + None => Ok(selection), + Some(filter) => { + // debug_assert_eq!( + // self.predicate_readers.len(), + // filter.predicates.len(), + // "predicate readers and predicates should have the same length" + // ); + + for (predicate, reader) in filter + .predicates + .iter_mut() + .zip(self.filter_readers.iter_mut()) + { + let array = read_selection(reader.as_mut(), &selection)?; + let batch = RecordBatch::from(array.as_struct_opt().ok_or_else(|| { + general_err!("Struct array reader should return struct array") + })?); + let input_rows = batch.num_rows(); + let predicate_filter = predicate.evaluate(batch)?; + if predicate_filter.len() != input_rows { + return Err(ArrowError::ParquetError(format!( + "ArrowPredicate predicate returned {} rows, expected {input_rows}", + predicate_filter.len() + ))); + } + let predicate_filter = match predicate_filter.null_count() { + 0 => predicate_filter, + _ => prep_null_mask_filter(&predicate_filter), + }; + let raw = RowSelection::from_filters(&[predicate_filter]); + selection = selection.and_then(&raw); + } + Ok(selection) + } + } + } } /// Returns `true` if `selection` is `None` or selects some rows @@ -975,109 +1191,125 @@ pub(crate) fn apply_range( selection } -/// Evaluates an [`ArrowPredicate`], returning a [`RowSelection`] indicating -/// which rows to return. -/// -/// `input_selection`: Optional pre-existing selection. If `Some`, then the -/// final [`RowSelection`] will be the conjunction of it and the rows selected -/// by `predicate`. -/// -/// Note: A pre-existing selection may come from evaluating a previous predicate -/// or if the [`ParquetRecordBatchReader`] specified an explicit -/// [`RowSelection`] in addition to one or more predicates. -pub(crate) fn evaluate_predicate( - batch_size: usize, - array_reader: Box, - input_selection: Option, - predicate: &mut dyn ArrowPredicate, -) -> Result { - let reader = ParquetRecordBatchReader::new(batch_size, array_reader, input_selection.clone()); - let mut filters = vec![]; - for maybe_batch in reader { - let maybe_batch = maybe_batch?; - let input_rows = maybe_batch.num_rows(); - let filter = predicate.evaluate(maybe_batch)?; - // Since user supplied predicate, check error here to catch bugs quickly - if filter.len() != input_rows { - return Err(arrow_err!( - "ArrowPredicate predicate returned {} rows, expected {input_rows}", - filter.len() - )); +fn read_selection( + reader: &mut dyn ArrayReader, + selection: &RowSelection, +) -> Result { + for selector in selection.iter() { + if selector.skip { + let skipped = reader.skip_records(selector.row_count)?; + debug_assert_eq!(skipped, selector.row_count, "failed to skip rows"); + } else { + let read_records = reader.read_records(selector.row_count)?; + debug_assert_eq!(read_records, selector.row_count, "failed to read rows"); } - match filter.null_count() { - 0 => filters.push(filter), - _ => filters.push(prep_null_mask_filter(&filter)), - }; } - - let raw = RowSelection::from_filters(&filters); - Ok(match input_selection { - Some(selection) => selection.and_then(&raw), - None => raw, - }) + reader.consume_batch() } -fn evaluate_predicate_batch( - batch_size: usize, - mut filter_reader: ParquetRecordBatchReader, - mut predicates: Vec>, -) -> Result { - let mut passing = Vec::with_capacity(batch_size); - let mut total_selected = 0; - let mut batches = Vec::new(); - while total_selected < batch_size { - match filter_reader.next() { - Some(Ok(batch)) => { - // Apply predicates sequentially and combine with AND - let mut combined_mask: Option = None; - - for predicate in predicates.iter_mut() { - let mask = predicate.evaluate(batch.clone())?; - if mask.len() != batch.num_rows() { - return Err(ArrowError::ComputeError(format!( - "Predicate returned {} rows, expected {}", - mask.len(), - batch.num_rows() - ))); - } - combined_mask = match combined_mask { - Some(prev) => Some(and(&prev, &mask)?), - None => Some(mask), - }; - } - - if let Some(mask) = combined_mask { - batches.push(filter_record_batch( - &batch, - &mask)); - total_selected += mask.true_count(); - passing.push(mask); - } else { - let len = batch.num_rows(); - let buffer = BooleanBuffer::new_set(len); - let mask = BooleanArray::new(buffer, None); - total_selected += len; - passing.push(mask); - } - } - Some(Err(e)) => return Err(e), - None => break, - } - } - let arrays: Vec = passing - .into_iter() - .map(|b| Arc::new(b) as ArrayRef) - .collect(); - - let combined = concat(&arrays).unwrap(); - let boolean_combined = combined - .as_any() - .downcast_ref::() - .unwrap() - .clone(); - - Ok(boolean_combined) -} +// /// Evaluates an [`ArrowPredicate`], returning a [`RowSelection`] indicating +// /// which rows to return. +// /// +// /// `input_selection`: Optional pre-existing selection. If `Some`, then the +// /// final [`RowSelection`] will be the conjunction of it and the rows selected +// /// by `predicate`. +// /// +// /// Note: A pre-existing selection may come from evaluating a previous predicate +// /// or if the [`ParquetRecordBatchReader`] specified an explicit +// /// [`RowSelection`] in addition to one or more predicates. +// pub(crate) fn evaluate_predicate( +// batch_size: usize, +// array_reader: Box, +// input_selection: Option, +// predicate: &mut dyn ArrowPredicate, +// ) -> Result { +// let reader = ParquetRecordBatchReader::new(batch_size, array_reader, input_selection.clone()); +// let mut filters = vec![]; +// for maybe_batch in reader { +// let maybe_batch = maybe_batch?; +// let input_rows = maybe_batch.num_rows(); +// let filter = predicate.evaluate(maybe_batch)?; +// // Since user supplied predicate, check error here to catch bugs quickly +// if filter.len() != input_rows { +// return Err(arrow_err!( +// "ArrowPredicate predicate returned {} rows, expected {input_rows}", +// filter.len() +// )); +// } +// match filter.null_count() { +// 0 => filters.push(filter), +// _ => filters.push(prep_null_mask_filter(&filter)), +// }; +// } +// +// let raw = RowSelection::from_filters(&filters); +// Ok(match input_selection { +// Some(selection) => selection.and_then(&raw), +// None => raw, +// }) +// } + +// fn evaluate_predicate_batch( +// batch_size: usize, +// mut filter_reader: ParquetRecordBatchReader, +// mut predicates: Vec>, +// ) -> Result { +// let mut passing = Vec::with_capacity(batch_size); +// let mut total_selected = 0; +// let mut batches = Vec::new(); +// while total_selected < batch_size { +// match filter_reader.next() { +// Some(Ok(batch)) => { +// // Apply predicates sequentially and combine with AND +// let mut combined_mask: Option = None; +// +// for predicate in predicates.iter_mut() { +// let mask = predicate.evaluate(batch.clone())?; +// if mask.len() != batch.num_rows() { +// return Err(ArrowError::ComputeError(format!( +// "Predicate returned {} rows, expected {}", +// mask.len(), +// batch.num_rows() +// ))); +// } +// combined_mask = match combined_mask { +// Some(prev) => Some(and(&prev, &mask)?), +// None => Some(mask), +// }; +// } +// +// if let Some(mask) = combined_mask { +// batches.push(filter_record_batch( +// &batch, +// &mask)); +// total_selected += mask.true_count(); +// passing.push(mask); +// } else { +// let len = batch.num_rows(); +// let buffer = BooleanBuffer::new_set(len); +// let mask = BooleanArray::new(buffer, None); +// total_selected += len; +// passing.push(mask); +// } +// } +// Some(Err(e)) => return Err(e), +// None => break, +// } +// } +// let arrays: Vec = passing +// .into_iter() +// .map(|b| Arc::new(b) as ArrayRef) +// .collect(); +// +// let combined = concat(&arrays).unwrap(); +// let boolean_combined = combined +// .as_any() +// .downcast_ref::() +// .unwrap() +// .clone(); +// +// Ok(boolean_combined) +// } #[cfg(test)] mod tests { diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 45df68821ca8..02f7fcbd2f5a 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -40,7 +40,7 @@ use arrow_schema::{DataType, Fields, Schema, SchemaRef}; use crate::arrow::array_reader::{build_array_reader, RowGroups}; use crate::arrow::arrow_reader::{ - apply_range, evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderMetadata, + apply_range, selects_any, ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader, RowFilter, RowSelection, }; use crate::arrow::ProjectionMask; @@ -586,28 +586,33 @@ where metadata: self.metadata.as_ref(), }; - if let Some(filter) = self.filter.as_mut() { - for predicate in filter.predicates.iter_mut() { - if !selects_any(selection.as_ref()) { - return Ok((self, None)); - } - let predicate_projection = predicate.projection(); - row_group - .fetch(&mut self.input, predicate_projection, selection.as_ref()) - .await?; + let predicate_projection = self.filter + .as_mut() + .map(|filter| { + filter + .predicates + .iter_mut() + .map(|p| p.projection().clone()) + .reduce(|mut acc, p| { + acc.union(&p); + acc + }) + }) + .flatten(); + + let projection_to_cache = predicate_projection.as_ref().map(|p| { + let mut p = p.clone(); + p.intersect(&projection); + p + }); - let array_reader = - build_array_reader(self.fields.as_deref(), predicate_projection, &row_group)?; + let project_exclude_filter = predicate_projection.as_ref().map(|p| { + let mut rest = projection.clone(); + rest.subtract(p); + rest + }).or_else(|| Some(projection.clone())); - selection = Some(evaluate_predicate( - batch_size, - array_reader, - selection, - predicate.as_mut(), - )?); - } - } // Compute the number of rows in the selection before applying limit and offset let rows_before = selection @@ -642,13 +647,44 @@ where *limit -= rows_after; } + let mut filter_readers = vec![]; + + if let Some(filter) = self.filter.as_mut() { + for predicate in filter.predicates.iter_mut() { + if !selects_any(selection.as_ref()) { + return Ok((self, None)); + } + + let predicate_projection = predicate.projection(); + + row_group + .fetch(&mut self.input, predicate_projection, selection.as_ref()) + .await?; + + let array_reader = build_array_reader( + self.fields.as_deref(), + predicate_projection, + &row_group, + )?; + + filter_readers.push(array_reader); + } + } + + // let filter_reader = build_array_reader(self.fields.as_deref(), predicate_projection.as_ref().unwrap(), &row_group)?; + + + // Fetch the data pages for the row group which is the final project excluding the filter row_group - .fetch(&mut self.input, &projection, selection.as_ref()) + .fetch(&mut self.input, &project_exclude_filter.as_ref().unwrap(), selection.as_ref()) .await?; let reader = ParquetRecordBatchReader::new( batch_size, - build_array_reader(self.fields.as_deref(), &projection, &row_group)?, + build_array_reader(self.fields.as_deref(), project_exclude_filter.as_ref().unwrap(), &row_group)?, + filter_readers, + self.filter.take(), + projection_to_cache, selection, ); diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index 76f8ef1bf068..cee362a46863 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -407,6 +407,26 @@ impl ProjectionMask { } } } + + /// Subtract two projection masks + /// + /// Example: + /// ```text + /// mask1 = [true, false, true] + /// mask2 = [false, true, true] + /// subtract(mask1, mask2) = [true, false, false] + /// ``` + pub fn subtract(&mut self, other: &Self) { + match (self.mask.as_ref(), other.mask.as_ref()) { + (None, _) => {} + (_, None) => {} + (Some(a), Some(b)) => { + debug_assert_eq!(a.len(), b.len()); + let mask = a.iter().zip(b.iter()).map(|(&a, &b)| a && !b).collect(); + self.mask = Some(mask); + } + } + } } /// Lookups up the parquet column by name From 2391c2c070a1265c92ac7935c8df267c3cc01b30 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Wed, 14 May 2025 17:12:50 +0800 Subject: [PATCH 3/6] Clean up code --- parquet/src/arrow/arrow_reader/mod.rs | 157 -------------------------- 1 file changed, 157 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 573187bfc5db..38e50b45b35a 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -960,59 +960,6 @@ impl Iterator for ParquetRecordBatchReader { } } - // let mut read_records = 0; - // match self.selection.as_mut() { - // Some(selection) => { - // while read_records < self.batch_size && !selection.is_empty() { - // let front = selection.pop_front().unwrap(); - // if front.skip { - // let skipped = match self.array_reader.skip_records(front.row_count) { - // Ok(skipped) => skipped, - // Err(e) => return Some(Err(e.into())), - // }; - // - // if skipped != front.row_count { - // return Some(Err(general_err!( - // "failed to skip rows, expected {}, got {}", - // front.row_count, - // skipped - // ) - // .into())); - // } - // continue; - // } - // - // //Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader. - // //Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669 - // if front.row_count == 0 { - // continue; - // } - // - // // try to read record - // let need_read = self.batch_size - read_records; - // let to_read = match front.row_count.checked_sub(need_read) { - // Some(remaining) if remaining != 0 => { - // // if page row count less than batch_size we must set batch size to page row count. - // // add check avoid dead loop - // selection.push_front(RowSelector::select(remaining)); - // need_read - // } - // _ => front.row_count, - // }; - // match self.array_reader.read_records(to_read) { - // Ok(0) => break, - // Ok(rec) => read_records += rec, - // Err(error) => return Some(Err(error.into())), - // } - // } - // } - // None => { - // if let Err(error) = self.array_reader.read_records(self.batch_size) { - // return Some(Err(error.into())); - // } - // } - // }; - match self.array_reader.consume_batch() { Err(error) => Some(Err(error.into())), Ok(array) => { @@ -1207,110 +1154,6 @@ fn read_selection( reader.consume_batch() } -// /// Evaluates an [`ArrowPredicate`], returning a [`RowSelection`] indicating -// /// which rows to return. -// /// -// /// `input_selection`: Optional pre-existing selection. If `Some`, then the -// /// final [`RowSelection`] will be the conjunction of it and the rows selected -// /// by `predicate`. -// /// -// /// Note: A pre-existing selection may come from evaluating a previous predicate -// /// or if the [`ParquetRecordBatchReader`] specified an explicit -// /// [`RowSelection`] in addition to one or more predicates. -// pub(crate) fn evaluate_predicate( -// batch_size: usize, -// array_reader: Box, -// input_selection: Option, -// predicate: &mut dyn ArrowPredicate, -// ) -> Result { -// let reader = ParquetRecordBatchReader::new(batch_size, array_reader, input_selection.clone()); -// let mut filters = vec![]; -// for maybe_batch in reader { -// let maybe_batch = maybe_batch?; -// let input_rows = maybe_batch.num_rows(); -// let filter = predicate.evaluate(maybe_batch)?; -// // Since user supplied predicate, check error here to catch bugs quickly -// if filter.len() != input_rows { -// return Err(arrow_err!( -// "ArrowPredicate predicate returned {} rows, expected {input_rows}", -// filter.len() -// )); -// } -// match filter.null_count() { -// 0 => filters.push(filter), -// _ => filters.push(prep_null_mask_filter(&filter)), -// }; -// } -// -// let raw = RowSelection::from_filters(&filters); -// Ok(match input_selection { -// Some(selection) => selection.and_then(&raw), -// None => raw, -// }) -// } - -// fn evaluate_predicate_batch( -// batch_size: usize, -// mut filter_reader: ParquetRecordBatchReader, -// mut predicates: Vec>, -// ) -> Result { -// let mut passing = Vec::with_capacity(batch_size); -// let mut total_selected = 0; -// let mut batches = Vec::new(); -// while total_selected < batch_size { -// match filter_reader.next() { -// Some(Ok(batch)) => { -// // Apply predicates sequentially and combine with AND -// let mut combined_mask: Option = None; -// -// for predicate in predicates.iter_mut() { -// let mask = predicate.evaluate(batch.clone())?; -// if mask.len() != batch.num_rows() { -// return Err(ArrowError::ComputeError(format!( -// "Predicate returned {} rows, expected {}", -// mask.len(), -// batch.num_rows() -// ))); -// } -// combined_mask = match combined_mask { -// Some(prev) => Some(and(&prev, &mask)?), -// None => Some(mask), -// }; -// } -// -// if let Some(mask) = combined_mask { -// batches.push(filter_record_batch( -// &batch, -// &mask)); -// total_selected += mask.true_count(); -// passing.push(mask); -// } else { -// let len = batch.num_rows(); -// let buffer = BooleanBuffer::new_set(len); -// let mask = BooleanArray::new(buffer, None); -// total_selected += len; -// passing.push(mask); -// } -// } -// Some(Err(e)) => return Err(e), -// None => break, -// } -// } -// let arrays: Vec = passing -// .into_iter() -// .map(|b| Arc::new(b) as ArrayRef) -// .collect(); -// -// let combined = concat(&arrays).unwrap(); -// let boolean_combined = combined -// .as_any() -// .downcast_ref::() -// .unwrap() -// .clone(); -// -// Ok(boolean_combined) -// } - #[cfg(test)] mod tests { use std::cmp::min; From f06170779c544b0f16ef0268bb3d48272ed191f8 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Wed, 14 May 2025 18:01:33 +0800 Subject: [PATCH 4/6] Fix code --- parquet/src/arrow/arrow_reader/mod.rs | 68 ++++----------------------- parquet/src/arrow/async_reader/mod.rs | 9 ++-- 2 files changed, 15 insertions(+), 62 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 38e50b45b35a..560169ef17a2 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -875,8 +875,15 @@ impl Iterator for ParquetRecordBatchReader { let mut current_selections: Vec = vec![]; while current_selected < self.batch_size { - let Some(selection) = self.selection.as_mut() else { - break; + let selection: &mut VecDeque = match self.selection.as_mut() { + Some(s) => s, + None => { + self.selection = Some( + std::iter::once(RowSelector::select(self.batch_size)) + .collect::>(), + ); + self.selection.as_mut().unwrap() + } }; let Some(mut raw_sel) = take_next_selection(selection, self.batch_size) else { @@ -943,20 +950,10 @@ impl Iterator for ParquetRecordBatchReader { continue; } - let read = match self.array_reader.read_records(selector.row_count) { + match self.array_reader.read_records(selector.row_count) { Ok(read) => read, Err(e) => return Some(Err(e.into())), }; - - if read != selector.row_count { - return Some(Err(general_err!( - "failed to read rows, expected {}, got {}", - selector.row_count, - read - ) - .into())); - } - } } @@ -1052,52 +1049,7 @@ impl ParquetRecordBatchReader { selection: selection.map(|s| s.trim().into()), } } - - - /// Take a selection, and return the new selection where the rows are filtered by the predicate. - fn build_predicate_filter( - &mut self, - mut selection: RowSelection, - ) -> Result { - match &mut self.row_filter { - None => Ok(selection), - Some(filter) => { - // debug_assert_eq!( - // self.predicate_readers.len(), - // filter.predicates.len(), - // "predicate readers and predicates should have the same length" - // ); - - for (predicate, reader) in filter - .predicates - .iter_mut() - .zip(self.filter_readers.iter_mut()) - { - let array = read_selection(reader.as_mut(), &selection)?; - let batch = RecordBatch::from(array.as_struct_opt().ok_or_else(|| { - general_err!("Struct array reader should return struct array") - })?); - let input_rows = batch.num_rows(); - let predicate_filter = predicate.evaluate(batch)?; - if predicate_filter.len() != input_rows { - return Err(ArrowError::ParquetError(format!( - "ArrowPredicate predicate returned {} rows, expected {input_rows}", - predicate_filter.len() - ))); - } - let predicate_filter = match predicate_filter.null_count() { - 0 => predicate_filter, - _ => prep_null_mask_filter(&predicate_filter), - }; - let raw = RowSelection::from_filters(&[predicate_filter]); - selection = selection.and_then(&raw); - } - Ok(selection) - } - } - } } - /// Returns `true` if `selection` is `None` or selects some rows pub(crate) fn selects_any(selection: Option<&RowSelection>) -> bool { selection.map(|x| x.selects_any()).unwrap_or(true) diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 02f7fcbd2f5a..e2fbfebf8799 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -39,10 +39,7 @@ use arrow_array::RecordBatch; use arrow_schema::{DataType, Fields, Schema, SchemaRef}; use crate::arrow::array_reader::{build_array_reader, RowGroups}; -use crate::arrow::arrow_reader::{ - apply_range, selects_any, ArrowReaderBuilder, ArrowReaderMetadata, - ArrowReaderOptions, ParquetRecordBatchReader, RowFilter, RowSelection, -}; +use crate::arrow::arrow_reader::{apply_range, selects_any, ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader, RowFilter, RowSelection, RowSelector}; use crate::arrow::ProjectionMask; use crate::bloom_filter::{ @@ -614,6 +611,10 @@ where }).or_else(|| Some(projection.clone())); + if selection.is_none() { + selection = Some(RowSelection::from(vec![RowSelector::select(row_group.row_count)])); + } + // Compute the number of rows in the selection before applying limit and offset let rows_before = selection .as_ref() From f785cd9b6f04d9044a13c28dc85ddc4a68094b71 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Wed, 14 May 2025 23:05:42 +0800 Subject: [PATCH 5/6] Fix exclude project --- parquet/src/arrow/arrow_reader/mod.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 560169ef17a2..c84ac168a217 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -704,7 +704,7 @@ impl ParquetRecordBatchReaderBuilder { p }); - let project_exclude_filter = predicate_projection.as_ref().map(|p| { + let project_exclude_filter = projection_to_cache.as_ref().map(|p| { let mut rest = projection.clone(); rest.subtract(p); rest @@ -968,7 +968,10 @@ impl Iterator for ParquetRecordBatchReader { match struct_array { Err(err) => Some(Err(err)), - Ok(e) => (e.len() > 0).then(|| Ok(RecordBatch::from(e))), + Ok(e) => { + // println!("e.len() = {}", e.len()); + (e.len() > 0).then(|| Ok(RecordBatch::from(e))) + }, } } } From 2061ebe6b5c8ae1c8cfc1d8bd7c6ae2f6d2c870d Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Thu, 15 May 2025 12:46:21 +0800 Subject: [PATCH 6/6] Fix the predicate evaluate for filters --- parquet/src/arrow/arrow_reader/mod.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index c84ac168a217..201ccf9141a3 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -899,6 +899,7 @@ impl Iterator for ParquetRecordBatchReader { "predicate readers and predicates should have the same length" ); + let mut final_select = raw_sel.clone(); for (predicate, reader) in filter .predicates .iter_mut() @@ -921,9 +922,13 @@ impl Iterator for ParquetRecordBatchReader { _ => prep_null_mask_filter(&predicate_filter), }; let raw = RowSelection::from_filters(&[predicate_filter]); - raw_sel = raw_sel.and_then(&raw); + final_select = final_select.and_then(&raw); + + if !final_select.selects_any() { + break + } } - Ok(raw_sel) + Ok(final_select) } };