@@ -22,9 +22,12 @@ use crate::arrow::array_reader::ArrayReader;
2222use crate :: arrow:: arrow_reader:: {
2323 ArrowPredicate , ParquetRecordBatchReader , RowSelection , RowSelector ,
2424} ;
25+ use crate :: arrow:: ProjectionMask ;
2526use crate :: errors:: { ParquetError , Result } ;
26- use arrow_array:: Array ;
27- use arrow_select:: filter:: prep_null_mask_filter;
27+ use arrow_array:: { Array , ArrayRef , BooleanArray , RecordBatch , RecordBatchReader } ;
28+ use arrow_schema:: { DataType , Schema } ;
29+ use arrow_select:: concat:: concat;
30+ use arrow_select:: filter:: { filter, prep_null_mask_filter} ;
2831use std:: collections:: VecDeque ;
2932
3033/// A builder for [`ReadPlan`]
@@ -35,7 +38,8 @@ pub(crate) struct ReadPlanBuilder {
3538 batch_size : usize ,
3639 /// Current to apply, includes all filters
3740 selection : Option < RowSelection > ,
38- // TODO: Cached result of evaluating some columns with the RowSelection
41+ /// Cached result of evaluating some columns with the RowSelection
42+ cached_predicate_result : Option < CachedPredicateResult > ,
3943}
4044
4145impl ReadPlanBuilder {
@@ -46,6 +50,7 @@ impl ReadPlanBuilder {
4650 Self {
4751 batch_size,
4852 selection : None ,
53+ cached_predicate_result : None ,
4954 }
5055 }
5156
@@ -88,41 +93,60 @@ impl ReadPlanBuilder {
8893
8994 /// Evaluates an [`ArrowPredicate`], updating the read plan's selection
9095 ///
96+ /// # Arguments
97+ ///
98+ /// * `array_reader`: The array reader to use for evaluating the predicate.
99+ /// must be configured with the projection mask specified by
100+ /// [`ArrowPredicate::projection`] for the `predicate`.
101+ ///
102+ /// * `predicate`: The predicate to evaluate
103+ ///
104+ /// * `projection`: The projection mask that will be selected. This code will
105+ /// potentially cache the results of filtering columns that also appear in the
106+ /// projection mask.
107+ ///
91108 /// If `this.selection` is `Some`, the resulting [`RowSelection`] will be
92- /// the conjunction of it and the rows selected by `predicate`.
109+ /// the conjunction of it and the rows selected by `predicate` (they will be
110+ /// `AND`ed).
93111 ///
94- /// Note: A pre-existing selection may come from evaluating a previous predicate
95- /// or if the [`ParquetRecordBatchReader`] specified an explicit
112+ /// Note: A pre-existing selection may come from evaluating a previous
113+ /// predicate or if the [`ParquetRecordBatchReader`] specifies an explicit
96114 /// [`RowSelection`] in addition to one or more predicates.
97115 pub ( crate ) fn with_predicate (
98116 mut self ,
99117 array_reader : Box < dyn ArrayReader > ,
100118 predicate : & mut dyn ArrowPredicate ,
119+ projection_mask : & ProjectionMask ,
101120 ) -> Result < Self > {
121+ // Prepare to decode all rows in the selection to evaluate the predicate
102122 let reader = ParquetRecordBatchReader :: new ( array_reader, self . clone ( ) . build ( ) ) ;
103- let mut filters = vec ! [ ] ;
123+ let mut cached_results_builder = CachedPredicateResultBuilder :: new (
124+ & reader. schema ( ) ,
125+ predicate. projection ( ) ,
126+ projection_mask,
127+ ) ;
104128 for maybe_batch in reader {
105- let maybe_batch = maybe_batch?;
106- let input_rows = maybe_batch . num_rows ( ) ;
107- let filter = predicate. evaluate ( maybe_batch ) ?;
129+ let batch = maybe_batch?;
130+ let input_rows = batch . num_rows ( ) ;
131+ let filter = predicate. evaluate ( batch . clone ( ) ) ?;
108132 // Since user supplied predicate, check error here to catch bugs quickly
109133 if filter. len ( ) != input_rows {
110134 return Err ( arrow_err ! (
111135 "ArrowPredicate predicate returned {} rows, expected {input_rows}" ,
112136 filter. len( )
113137 ) ) ;
114138 }
115- match filter. null_count ( ) {
116- 0 => filters. push ( filter) ,
117- _ => filters. push ( prep_null_mask_filter ( & filter) ) ,
118- } ;
139+ cached_results_builder. add ( batch, filter) ?;
119140 }
120141
121- let raw = RowSelection :: from_filters ( & filters) ;
142+ let ( raw, cached_predicate_result) =
143+ cached_results_builder. build ( self . batch_size , predicate. projection ( ) ) ?;
122144 self . selection = match self . selection . take ( ) {
123145 Some ( selection) => Some ( selection. and_then ( & raw ) ) ,
124146 None => Some ( raw) ,
125147 } ;
148+
149+ self . cached_predicate_result = cached_predicate_result;
126150 Ok ( self )
127151 }
128152
@@ -135,13 +159,15 @@ impl ReadPlanBuilder {
135159 let Self {
136160 batch_size,
137161 selection,
162+ cached_predicate_result,
138163 } = self ;
139164
140165 let selection = selection. map ( |s| s. trim ( ) . into ( ) ) ;
141166
142167 ReadPlan {
143168 batch_size,
144169 selection,
170+ cached_predicate_result,
145171 }
146172 }
147173}
@@ -238,7 +264,8 @@ pub(crate) struct ReadPlan {
238264 batch_size : usize ,
239265 /// Row ranges to be selected from the data source
240266 selection : Option < VecDeque < RowSelector > > ,
241- // TODO: Cached result of evaluating some columns with the RowSelection
267+ /// Cached result of evaluating some columns with the RowSelection
268+ cached_predicate_result : Option < CachedPredicateResult > ,
242269}
243270
244271impl ReadPlan {
@@ -252,3 +279,237 @@ impl ReadPlan {
252279 self . batch_size
253280 }
254281}
282+
283+ /// Incrementally builds the result of evaluating a ArrowPredicate on
284+ /// a RowGroup.
285+ struct CachedPredicateResultBuilder {
286+ /// The entire result of the predicate evaluation in memory
287+ ///
288+ /// TODO: potentially incrementally build the result of the predicate
289+ /// evaluation without holding all the batches in memory. See
290+ /// <https://github.com/apache/arrow-rs/issues/6692>
291+ in_progress_arrays : Vec < Box < dyn InProgressArray > > ,
292+ filters : Vec < BooleanArray > ,
293+ }
294+
295+ impl CachedPredicateResultBuilder {
296+ /// Create a new CachedPredicateResultBuilder
297+ ///
298+ /// # Arguments:
299+ /// * `schema`: The schema of the filter record batch
300+ /// * `filter_mask`: which columns of the original parquet schema did the filter columns come from?
301+ /// * `projection_mask`: which columns of the original parquet schema are in the final projection?
302+ ///
303+ /// This structure does not cache filter results for the columns that are not
304+ /// in the projection mask. This is because the filter results are not needed
305+ fn new (
306+ schema : & Schema ,
307+ filter_mask : & ProjectionMask ,
308+ projection_mask : & ProjectionMask ,
309+ ) -> Self {
310+ let mut field_iter = schema. fields . iter ( ) ;
311+
312+ let ( filter_mask_inner, projection_mask_inner) =
313+ match ( filter_mask. mask ( ) , projection_mask. mask ( ) ) {
314+ ( Some ( filter_mask) , Some ( projection_mask) ) => ( filter_mask, projection_mask) ,
315+ // NB, None means all columns and we just want the intersection of the two
316+ ( Some ( filter_mask) , None ) => ( filter_mask, filter_mask) ,
317+ ( None , Some ( projection_mask) ) => ( projection_mask, projection_mask) ,
318+ ( None , None ) => {
319+ // this means all columns are in the projection and filter so cache them all when possible
320+ let in_progress_arrays = field_iter
321+ . into_iter ( )
322+ . map ( |field| create_in_progress_array ( true , field. data_type ( ) ) )
323+ . collect ( ) ;
324+ return {
325+ Self {
326+ in_progress_arrays,
327+ filters : vec ! [ ] ,
328+ }
329+ } ;
330+ }
331+ } ;
332+
333+ let mut in_progress_arrays = Vec :: with_capacity ( filter_mask_inner. len ( ) ) ;
334+
335+ for ( & in_filter, & in_projection) in
336+ filter_mask_inner. iter ( ) . zip ( projection_mask_inner. iter ( ) )
337+ {
338+ if !in_filter {
339+ continue ;
340+ }
341+ // field is in the filter
342+ let field = field_iter. next ( ) . expect ( "mismatch in field lengths" ) ;
343+ in_progress_arrays. push ( create_in_progress_array ( in_projection, field. data_type ( ) ) ) ;
344+ }
345+ assert_eq ! ( in_progress_arrays. len( ) , schema. fields( ) . len( ) ) ;
346+
347+ Self {
348+ in_progress_arrays,
349+ filters : vec ! [ ] ,
350+ }
351+ }
352+
353+ /// Add a new batch and filter to the builder
354+ fn add ( & mut self , batch : RecordBatch , mut filter : BooleanArray ) -> Result < ( ) > {
355+ if filter. null_count ( ) > 0 {
356+ filter = prep_null_mask_filter ( & filter) ;
357+ }
358+
359+ let ( _schema, columns, _row_count) = batch. into_parts ( ) ;
360+
361+ for ( in_progress, array) in self . in_progress_arrays . iter_mut ( ) . zip ( columns. into_iter ( ) ) {
362+ in_progress. append ( array, & filter) ?;
363+ }
364+
365+ self . filters . push ( filter) ;
366+ Ok ( ( ) )
367+ }
368+
369+ /// Return (selection, maybe_cached_predicate_result) that represents the rows
370+ /// that were selected and batches that were evaluated.
371+ fn build (
372+ self ,
373+ _batch_size : usize ,
374+ filter_mask : & ProjectionMask ,
375+ ) -> Result < ( RowSelection , Option < CachedPredicateResult > ) > {
376+ let Self {
377+ in_progress_arrays,
378+ filters,
379+ } = self ;
380+
381+ let new_selection = RowSelection :: from_filters ( & filters) ;
382+
383+ let Some ( mask) = filter_mask. mask ( ) else {
384+ return Ok ( ( new_selection, None ) ) ;
385+ } ;
386+
387+ let mut arrays: Vec < Option < ArrayRef > > = vec ! [ None ; mask. len( ) ] ;
388+ let mut in_progress_arrays = VecDeque :: from ( in_progress_arrays) ;
389+
390+ // Now find the location in the original parquet schema of the filter columns
391+ for i in 0 ..mask. len ( ) {
392+ if mask[ i] {
393+ let mut in_progress = in_progress_arrays
394+ . pop_front ( )
395+ . expect ( "insufficient inprogress arrays" ) ;
396+ arrays[ i] = in_progress. try_build ( ) ?;
397+ }
398+ }
399+
400+ let cached_result = CachedPredicateResult { arrays, filters } ;
401+
402+ Ok ( ( new_selection, Some ( cached_result) ) )
403+ }
404+ }
405+
406+ /// The result of evaluating a predicate on a RowGroup with a specific
407+ /// RowSelection
408+ #[ derive( Clone ) ]
409+ struct CachedPredicateResult {
410+ /// Map of parquet schema column index to the result of evaluating the predicate
411+ /// on that column.
412+ ///
413+ /// NOTE each array already has the corresponding filters applied
414+ ///
415+ /// TODO: store as Vec<Vec<ArrayRef>> to avoid having to have one large
416+ /// array for each column
417+ arrays : Vec < Option < ArrayRef > > ,
418+ /// The results of evaluating the predicate (this has already been applied to the
419+ /// cached results)
420+ filters : Vec < BooleanArray > ,
421+ }
422+
423+ impl CachedPredicateResult {
424+ fn empty ( ) -> Self {
425+ Self {
426+ arrays : vec ! [ ] ,
427+ filters : vec ! [ ] ,
428+ }
429+ }
430+
431+ /// Apply the results of other to self
432+ /// Updates cached filter result and filters potentially
433+ pub fn merge ( self , other : Self ) -> Self {
434+ // TODO do something with self
435+ other
436+ }
437+ }
438+
439+ /// Progressively creates array from filtered values
440+ ///
441+ /// TODO avoid buffering the input memory
442+ trait InProgressArray {
443+ /// Appends all values of the array to the in progress array at locations where filter[i] is true
444+ /// to the in progress array
445+ fn append ( & mut self , _array : ArrayRef , filter : & BooleanArray ) -> Result < ( ) > ;
446+
447+ /// Builds the final array, consuming all state from self. Returns None if the array
448+ /// cannot be created (e.g. data type not supported or out of buffer space)
449+ fn try_build ( & mut self ) -> Result < Option < ArrayRef > > ;
450+ }
451+
452+ /// Return a new InProgressArray for the given data type
453+ ///
454+ /// if `in_projection` is false then a NoOpInProgressArray is returned (will not
455+ /// actually cache arrays results)
456+ fn create_in_progress_array (
457+ in_projection : bool ,
458+ _data_type : & DataType ,
459+ ) -> Box < dyn InProgressArray > {
460+ if in_projection {
461+ Box :: new ( GenericInProgressArray :: new ( ) )
462+ } else {
463+ // column is not in the projection, so no need to cache
464+ Box :: new ( NoOpInProgressArray { } )
465+ }
466+ }
467+
468+ /// Placeholder that does nothing until we support the entire set of datatypes
469+ struct NoOpInProgressArray { }
470+
471+ impl InProgressArray for NoOpInProgressArray {
472+ fn append ( & mut self , _array : ArrayRef , _filter : & BooleanArray ) -> Result < ( ) > {
473+ // do nothing
474+ Ok ( ( ) )
475+ }
476+ fn try_build ( & mut self ) -> Result < Option < ArrayRef > > {
477+ // do nothing
478+ Ok ( None )
479+ }
480+ }
481+
482+ /// a generic implementation of InProgressArray that uses filter and concat kernels
483+ /// to create the final array
484+ ///
485+ /// TODO: make this better with per type implementations
486+ /// <https://github.com/apache/arrow-rs/issues/6692>
487+ struct GenericInProgressArray {
488+ /// previously filtered arrays
489+ arrays : Vec < ArrayRef > ,
490+ }
491+
492+ impl GenericInProgressArray {
493+ fn new ( ) -> Self {
494+ Self { arrays : vec ! [ ] }
495+ }
496+ }
497+ impl InProgressArray for GenericInProgressArray {
498+ fn append ( & mut self , array : ArrayRef , filter_array : & BooleanArray ) -> Result < ( ) > {
499+ self . arrays . push ( filter ( & array, filter_array) ?) ;
500+ Ok ( ( ) )
501+ }
502+
503+ fn try_build ( & mut self ) -> Result < Option < ArrayRef > > {
504+ if self . arrays . is_empty ( ) {
505+ return Ok ( None ) ;
506+ }
507+ if self . arrays . len ( ) == 1 {
508+ return Ok ( Some ( self . arrays . pop ( ) . unwrap ( ) ) ) ;
509+ }
510+ // Vomit: need to copy to a new Vec to get dyn array
511+ let arrays: Vec < & dyn Array > = self . arrays . iter ( ) . map ( |a| a. as_ref ( ) ) . collect ( ) ;
512+ let array = concat ( & arrays) ?;
513+ Ok ( Some ( array) )
514+ }
515+ }
0 commit comments