-
Notifications
You must be signed in to change notification settings - Fork 1k
[Parquet] Adaptive Parquet Predicate Pushdown #8733
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Cargo bench result, added emoji for better looking, 🟢 means not worse, 👍🏻 means 20% more improve.
|
|
😮 thank you @hhhizzz -- I plan to review this PR carefully, but it will likely take me a few days |
|
fyi @zhuqi-lucas and @XiangpengHao |
|
🤖 |
| fn new(selectors: Vec<RowSelector>) -> Self { | ||
| let total_rows: usize = selectors.iter().map(|s| s.row_count).sum(); | ||
| let selector_count = selectors.len(); | ||
| const AVG_SELECTOR_LEN_MASK_THRESHOLD: usize = 8; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb It looks like similar to my original implementation which is fixed for choice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it's more reasonable for this PR:
Added a benchmark to determine this threshold value (8).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added bench in the code, you can also try on your machine. I find it varies heavily on different platform, on my Mac, it's 8, but on my x86 PC, the value can be set to around 30.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice @hhhizzz , i am wandering if we can change to more stable choice, such as statistic based choice, but it's a good start for this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's great idea, let me do some more investigation on different platform and put the result here.
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First of all, thank you so much @hhhizzz -- I think this is really nice change and the code is well structured and a pleasure to read. Also thank you to @zhuqi-lucas for setting the stage for much of this work
Given the performance results so far (basically as good or better as the existing code) I think this PR is almost ready to go
The only thing I am not sure about is the null page / skipping thing -- I left more comments inline
I think there are several additional improvements that could be done as follow on work:
- The heuristic for when to use the masking strategy can likely be improved based on the types of values being filtered (for example the number of columns or the inclusion of StringView)
- Avoid creating
RowSelectionjust to turn it back to a BooleanArray (I left comments inline)
| false, | ||
| )])); | ||
| let values = Int32Array::from_iter_values((0..total_rows).map(|v| v as i32)); | ||
| let columns: Vec<ArrayRef> = vec![Arc::new(values) as ArrayRef]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend we also test with some variable length rows too -- as the selection overhead may be different for StringArray/StringViewArray than a i32
| ("read_selectors", RowSelectionStrategy::Selectors), | ||
| ]; | ||
|
|
||
| fn criterion_benchmark(c: &mut Criterion) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found this code quite clear and easy to read -- thank you 🙏
I do think it would be good if we could add some of the background context here as a commenet
Specifically it is not obvious from just the code that this benchmark can be used to determine the value of AVG_SELECTOR_LEN_MASK_THRESHOLD) -- perhaps you can reuse some of the description from this PR?
Also, how did you generate these charts? If it is straightforward perhaps you can also describe that in the comments
#8733 (comment)
|
|
||
| let total_rows: usize = selectors.iter().map(|s| s.row_count).sum(); | ||
| let selector_count = selectors.len(); | ||
| const AVG_SELECTOR_LEN_MASK_THRESHOLD: usize = 16; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend we pull this constant somewhere that is easier to find along with a comment about what it is and how it was chosen. I suggest simply making it a constant in this module
| let mut cursor = start_position; | ||
| let mut initial_skip = 0; | ||
|
|
||
| while cursor < mask.len() && !mask.value(cursor) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect there are all sorts of bit level hacks we can do to make this faster (as a follow on PR) - for example leveraging the code to count the number of 1s a u64 at a time
| } | ||
| } | ||
|
|
||
| fn boolean_mask_from_selectors(selectors: &[RowSelector]) -> BooleanBuffer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can do even better than this (as a follow on PR)
The current code still converts the result of a filter (BooleanArray) to a RowSelection,
https://github.com/apache/arrow-rs/blob/cc1444a3232fa11b8485e2794a88f342bd7f97e2/parquet/src/arrow/arrow_reader/read_plan.rs#L113-L112
and then boolean_mask_from_selectors converts it back to a BooleanArray
However I think we could apply the result of evaluating the filter directly to a RowSelectionBacking::Mask
In fact, @XiangpengHao even has some (relatively crazy) techniques to combine masks quickly in #6624 (comment)
parquet/src/column/reader.rs
Outdated
| let remaining_records = max_records - total_records_read; | ||
| let remaining_levels = self.num_buffered_values - self.num_decoded_values; | ||
|
|
||
| if self.synthetic_page { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand the need for the synthetic page -- it seems like a workaround for some case that should be handled in the control flow loop in ParquetRecordBatchReader::next_inner
Specifically, given skipping / scanning data pages works with the RowSelection approach, why does a mask approach cause additional problems? In some way the mask approach should actually decode more rows, not less (as then the filter is applied afterwards)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the thorough code review!
Yes—this is the trickiest part of the PR. When no pages are skipped, everything works as expected. But some pages can be skipped during row-group construction, use the Sparse ColumnChunkData, meaning their values and definition/repetition levels are never read. Row selection still works because skip_records() handles this case and skips the page accordingly.
However, with the Boolean-array design, all values must be read and decoded before filtering. ParquetRecordBatchReader is a streaming reader; it has no concept of pages, so we can’t rely on page size to drive skipping there. I think the most practical approach, therefore, is to return dummy null values as placeholders for the skipped pages. If I missed something or there's better way to do so, just let me know. 😊
A simple example:
the page size is 2, the mask is 100001, row selection should be read(1) skip(4) read(1)
the ColumnChunkData would be page1(10), page2(skipped), page3(01)
Using the rowselection to skip(4), the page2 won't be read at all.
But using the bit mask, we need all 6 value be read, but the page2 is not in the memory, which is why I need to construct this synthetic page.
For completeness, I prototyped reconstructing the readers to handle skipped pages directly, but it introduces a breaking change: every array_reader would need a page-size parameter. That’s undesirable—users shouldn’t need page-level details just to read Parquet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am similarly still confused.
@hhhizzz your explanation makes sense to me in theory, but I just tested out removing the synthetic page code from this PR and the tests all still seem to pass. So that means we either have a testing gap or there is something else going on:
I looked more carefully, and it seems to me that the calculation of what pages to fetch is still based on RowSelection (not the RowSelectionCursor / RowSelectionBacking):
arrow-rs/parquet/src/arrow/async_reader/mod.rs
Lines 983 to 995 in cc1444a
| pub(crate) async fn fetch<T: AsyncFileReader + Send>( | |
| &mut self, | |
| input: &mut T, | |
| projection: &ProjectionMask, | |
| selection: Option<&RowSelection>, | |
| batch_size: usize, | |
| cache_mask: Option<&ProjectionMask>, | |
| ) -> Result<()> { | |
| // Figure out what ranges to fetch | |
| let FetchRanges { | |
| ranges, | |
| page_start_offsets, | |
| } = self.fetch_ranges(projection, selection, batch_size, cache_mask); |
Thus it does feel possible to have the the situation you explain where pages needed to evaluate the row selection weren't fetched 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error comes from my directing test on a parquet, let me added new tests for the scenario.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have been thinking about how to test this scenario and have some ideas (it is probably time to do some fuzz testing / testing with very selective predicates). I hope to help write some additional tests later this week.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you alamb, it looks like there's still something unresolved for the PR. I'm going to resolve it in the next few days. At mean time I may update or rebase the branch multiple times. So I converted the PR into draft.
The things left are:
- Add benchmark for the different types of value to determine the final length to do the
selection/bitmaskconverting - Add some guidance or tool to draw the charts, then we can collect more statistics data from different platform.
- For the design of synthetic page, We all agree it's not a good idea, I need to find another method to handle the sparse page.
- Add new tests to test if the
bitmaskmethod can handle all kinds of skipped page in sparse column chunk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much @hhhizzz -- this is super exciting and I will give top priority to reviewing this PR as you make changes.
parquet/src/column/reader.rs
Outdated
| // Some writers omit data pages for sparse column chunks and encode the gap | ||
| // as a reader-visible error. Use the metadata peek to synthesise a page of | ||
| // null definition levels so downstream consumers see consistent row counts. | ||
| self.try_create_synthetic_page(metadata)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels very fragile and likely to result in weird record shredding bugs - https://github.com/apache/arrow-rs/pull/8733/files#r2483674920
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additionally I think it would imply that the predicate pushdown is "reversing" earlier forms of pushdown and relying on the IO implementation to have chosen to do a sparse read - this feels unfortunate
parquet/src/column/reader.rs
Outdated
| if self.descr.max_rep_level() != 0 { | ||
| return Err(general_err!( | ||
| "cannot synthesise sparse page for column with repetition levels ({message})" | ||
| )); | ||
| } | ||
|
|
||
| if self.descr.max_def_level() == 0 { | ||
| return Err(general_err!( | ||
| "cannot synthesise sparse page for required column ({message})" | ||
| )); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would mean we error if we try to pushdown on a column with either repetition levels or a required column - this seems like quite a major regression
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took a quick look, whilst I think orchestrating this skipping at the RecordReader level does have a certain elegance, it runs into the issue that the masked selections aren't necessarily page-aligned.
By definition the mask selection strategy requests rows that weren't part of the original selection, the problem is that this could result in requesting rows for pages that we know are irrelevant. In some cases this just results in wasted IO, however, when using prefetching IO systems (such as AsyncParquetReader) this results in errors. The hack of creating empty pages I'm not a big fan of.
I think a better solution would be to ensure we only construct MaskChunk that don't cross page boundaries. Ideally this would be done on a per-leaf column basis, but tbh I suspect just doing it globally would probably work just fine.
Edit: If one was feeling fancy, one could ignore page boundaries where both pages were present in the original selection, although in practice I suspect this not to make a huge difference.
ad51d87 to
ed51620
Compare


Which issue does this PR close?
Rationale for this change
This change improves the performance of reading Parquet files.
What changes are included in this PR?
This pull request introduces significant improvements to row selection and filtering in the Parquet Arrow reader, optimizing batch reading and handling of sparse data. The most important changes include a new mask-based row selection state, enhancements to synthetic page handling, and expanded test coverage for these features.
Row selection and filtering improvements:
RowSelectionStateinread_plan.rs, which dynamically chooses between a bitmap mask array and selector queue for efficient row selection during batch reads. This enables streaming with contiguous mask segments and reduces overhead for sparse selections.ParquetRecordBatchReaderto leverage the mask-based selection, streaming record batches using boolean masks and applying Arrow filtering for selected rows. This avoids intermediate materialization and improves performance for sparse row selections. If the average length of theRowSelectoris less than 8, it will be replaced by a bitmap mask.Synthetic page and definition level handling:
A challenge with the mask-based approach is that some pages may be skipped, and due to the streaming design of the reader, it’s not always possible to determine in advance which pages will be skipped.
To address this, additional logic was added to return None when a page is skipped, ensuring correct handling in such cases.
Together, these improvements enhance both efficiency and correctness in row selection, filtering, and sparse data processing for the Parquet Arrow reader.
Are these changes tested?
Are there any user-facing changes?
No