-
Notifications
You must be signed in to change notification settings - Fork 1k
Rewrite ParquetRecordBatchStream in terms of the PushDecoder
#8159
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
|
From my perspective, the goal of "show we can use push decoder to rewrite the async decoder" is now complete and I will pick this PR up again once we get the push decoder merged |
# Which issue does this PR close? - Part of #8000 - closes #7983 # Rationale for this change This PR is the first part of separating IO and decode operations in the rust parquet decoder. Decoupling IO and CPU enables several important usecases: 1. Different IO patterns (e.g. not buffer the entire row group at once) 2. Different IO APIs e.g. use io_uring, or OpenDAL, etc. 3. Deliberate prefetching within a file 4. Avoid code duplication between the `ParquetRecordBatchStreamBuilder` and `ParquetRecordBatchReaderBuilder` # What changes are included in this PR? 1. Add new `ParquetDecoderBuilder`, and `ParquetDecoder` and tests It is effectively an explicit version of the state machine that is used in existing async reader (where the state machine is encoded as Rust `async` / `await` structures) # Are these changes tested? Yes -- there are extensive tests for the new code Note that this PR actually adds a **3rd** path for control flow (when I claim this will remove duplication!) In follow on PRs I will convert the existing readers to use this new pattern, similarly to the sequence I did for the metadata decoder: - #8080 - #8340 Here is a preview of a PR that consolidates the async reader to use the push decoder internally (and removes one duplicate): - #8159 - closes #8022 # Are there any user-facing changes? Yes, a new API, but now changes to the existing APIs --------- Co-authored-by: Matthijs Brobbel <[email protected]> Co-authored-by: Adrian Garcia Badaracco <[email protected]>
314dc5c to
f0f79dc
Compare
ParquetRecordBatchStream in terms of the PushDecoderParquetRecordBatchStream in terms of the PushDecoder
88fe84b to
7fe9fa6
Compare
| } | ||
|
|
||
| fn compute_cache_projection_inner(&self, filter: &RowFilter) -> Option<ProjectionMask> { | ||
| // Do not compute the projection mask if the predicate cache is disabled |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the fix from the following PR applied to the push decoder (now that the paths are unified)
3b7837d to
776ee65
Compare
| ) -> ReadResult<T> { | ||
| // TODO: calling build_array multiple times is wasteful | ||
|
|
||
| let meta = self.metadata.row_group(row_group_idx); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The stream reader has the same logic / algorithm, but now it uses the copy in the push decoder (which is based on this code) instead of this
| Ok(decode_result) | ||
| } | ||
|
|
||
| /// Attempt to return the next [`ParquetRecordBatchReader`] or return what data is needed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a new API on the ParquetPushDecoder that is needed to implement the existing next_row_group API: https://docs.rs/parquet/latest/parquet/arrow/async_reader/struct.ParquetRecordBatchStream.html#method.next_row_group
| /// buffering. | ||
| /// | ||
| /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html | ||
| pub struct ParquetRecordBatchStream<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am quite pleased with this -- ParquetRecordBatchStreamBuilder is now clearly separated into the IO handling piece request_state and the decoding piece, decoder
| let request_state = std::mem::replace(&mut self.request_state, RequestState::Done); | ||
| match request_state { | ||
| // No outstanding requests, proceed to setup next row group | ||
| RequestState::None { input } => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is now the core state machine of ParquetRecordBatchStream, and I am pleased it represents what is going on in a straightforward way: it alternates between decode and I/O
|
|
||
| #[tokio::test] | ||
| #[allow(deprecated)] | ||
| async fn test_in_memory_row_group_sparse() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test was introduced in the following PR by @thinkharderdev
I believe it is meant to verify the PageIndex is used to prune IO,
The reason I propose deleting this test is:
- IO pruning based on PageIndex is covered in the newer
iotests, for example// Expect to see only data IO for one page for each column for each row group - This test is in terms of non public APIs (the ReaderFactory and InMemoryRowGroup) which don't reflect the requests that are actually made (the ranges are coalesced, for example, for each column's pages)
|
🤖 |
776ee65 to
18038b2
Compare
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
18038b2 to
5c4dbc0
Compare
| impl ParquetDecoderState { | ||
| /// If actively reading a RowGroup, return the currently active | ||
| /// ParquetRecordBatchReader and advance to the next group. | ||
| fn try_next_reader( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a newly added "batched" API that makes it possible to read the next reader (that is ready to produce record batches)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this so that we can preserve the pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> API on the async reader?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, exactly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It actually turns out that is a pretty clever API that I didn't know about -- it lets one interleave IO and CPU more easily:
| /// | ||
| /// This function is called in a loop until the decoder is ready to return | ||
| /// data (has the required pages buffered) or is finished. | ||
| fn transition(self) -> Result<(Self, DecodeResult<()>), ParquetError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reworked so it can be shared between try_next_batch and try_next_reader
It also now avoids a self-recursive call which I think is a (minor) improvement
| /// | ||
| /// See examples on [`ParquetRecordBatchStreamBuilder::new`] | ||
| pub fn build(self) -> Result<ParquetRecordBatchStream<T>> { | ||
| let num_row_groups = self.metadata.row_groups().len(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The whole point of this PR is to remove all this code (and instead use the copy in the push decoder)
|
|
||
| let request_state = RequestState::None { input: input.0 }; | ||
|
|
||
| Ok(ParquetRecordBatchStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can see the Stream is much simpler now -- only the decoder and an object to track the current I/O state
|
This PR is now pretty much ready for review. It builds on a test refactor here: Once that is merged I will mark this one ready for review |
…level APIs (#8754) # Which issue does this PR close? - Related to #8677 - part of #8159 # Rationale for this change I am reworking how the parquet decoder's state machine works in #8159 One of the unit tests, `test_cache_projection_excludes_nested_columns` uses non-public APIs that I am changing Rather than rewrite them into other non public APIs I think it would be better if this test is in terms of public APIs # What changes are included in this PR? 1. refactor `test_cache_projection_excludes_nested_columns` to use high level APIs # Are these changes tested? They are run in CI I also verified this test covers the intended functionality by commenting it out: ```diff --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -724,7 +724,9 @@ where cache_projection.union(predicate.projection()); } cache_projection.intersect(projection); - self.exclude_nested_columns_from_cache(&cache_projection) + // TEMP don't exclude nested columns + //self.exclude_nested_columns_from_cache(&cache_projection) + Some(cache_projection) } /// Exclude leaves belonging to roots that span multiple parquet leaves (i.e. nested columns) ``` And then running the test: ```shell cargo test --all-features --test arrow_reader ``` And the test fails (as expected) ``` ---- predicate_cache::test_cache_projection_excludes_nested_columns stdout ---- thread 'predicate_cache::test_cache_projection_excludes_nested_columns' panicked at parquet/tests/arrow_reader/predicate_cache.rs:244:9: assertion `left == right` failed: Expected 0 records read from cache, but got 100 left: 100 right: 0 note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace failures: predicate_cache::test_cache_projection_excludes_nested_columns test result: FAILED. 88 passed; 1 failed; 1 ignored; 0 measured; 0 filtered out; finished in 0.20s ``` # Are there any user-facing changes? No, this is only test changes
5c4dbc0 to
73a16cf
Compare
| } | ||
|
|
||
| fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes, ParquetError> { | ||
| if start > self.file_len { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the async decoder doesn't know (or need to know) the entire file length so I removed this somewhat more specific error message and instead will rely on the underlying source reporting errors when appropriate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you Andrew. This is my first review in the codebase, but fwiw, this looks good to me.
|
|
||
| if let Some(limit) = &mut self.limit { | ||
| *limit -= rows_after; | ||
| // Issue a request to fetch a single range, returining the Outstanding state |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's to fetch a single range, why does it take Vec<Range<u64>> as a parameter?
I suppose the comment is wrong, not the parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in ee64444
| // (aka can have references internally) and thus must | ||
| // own the input while the request is outstanding. | ||
| let future = async move { | ||
| let data = input.get_byte_ranges(ranges_captured).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An aside: I don't understand why the default implementation for the AsyncReader fetches range by range sequentially instead of utilizing concurrency of the underlying runtime:
/// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially
Please let me know if it doesn't resonate with you either and I can open an issue for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An aside: I don't understand why the default implementation for the AsyncReader fetches range by range sequentially instead of utilizing concurrency of the underlying runtime:
I think you are referring to this:
arrow-rs/parquet/src/arrow/async_reader/mod.rs
Lines 79 to 91 in db876a9
| fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> { | |
| async move { | |
| let mut result = Vec::with_capacity(ranges.len()); | |
| for range in ranges.into_iter() { | |
| let data = self.get_bytes(range).await?; | |
| result.push(data); | |
| } | |
| Ok(result) | |
| } | |
| .boxed() | |
| } |
I think one reason is that the concurrency model is different depending on the runtime (e.g. the way you launch concurrent IO using tokio is different than how you launch concurrent tasks for io_uring, for example). Also there may be benefits to doing larger swaths of IO -- e.g. S3 doesn't actually support multiple ranges in a single requests
So in my mind the way "utilizing concurrency of the underlying runtime:" is achieved is by providing an implementation of AsyncFileReader with the appropriate specialization for get_ranges.
One thing we could consider, FWIW, is to remove the default implementation which would force each impl to specialize get_ranges 🤔
BTW One of my primary motivations for extracting the parquet state machine into ParquetPushDecoder is precisely to make it easier to do such specialized IO. I have plans to write a blog post about this topic, but it will probably take me another month or so
| } else { | ||
| // All rows skipped, read next row group | ||
| continue; | ||
| let request_state = std::mem::replace(&mut self.request_state, RequestState::Done); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this ownership trick needed? Perhaps you could comment in the code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is the way I could get the Rust ownership rules to be happy (aka ensure that self.request_state always has a valid value and can't be in some partial state). I have added a comment in 3ec7448
| impl ParquetDecoderState { | ||
| /// If actively reading a RowGroup, return the currently active | ||
| /// ParquetRecordBatchReader and advance to the next group. | ||
| fn try_next_reader( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this so that we can preserve the pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> API on the async reader?
| let decoder = ParquetPushDecoderBuilder { | ||
| // Async reader doesn't know the overall size of the input, but it | ||
| // is not required for decoding as we will already have the metadata | ||
| input: 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I missed the previous PR, but am a bit confused with the input field of ArrowReaderBuilder. Is it meant to represent arbitrary input, specific to a specialized type (in this case file_length for ParquetPushDecoderBuilder)? I wonder if it would've been better if we had something like this:
struct ParquetPushDecoderBuilder {
reader_builder::ArrowReaderBuilder
file_len::ut6
}Just a thought, not intended to be addressed here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it would've been better if we had something like this:
I agree this would be much cleaner.
The input field is confusing in the context of the "push decoder" as there is (by design) no input.
However, the current structure is designed so the exact same builder code can be shared for the three different decoder types. Using an ArrowReaderBuilder internally is an interesting idea, but we would need to find some way to pass along options (either by duplicating methods from ArrowReaderBuilder to pass through, or constructing the push decoder builder from the ArrowReaderBuilder)
However, I will try and change the type from u64 to some new type where this context can be commented rather than have this strange 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Co-authored-by: Vukasin Stefanovic <[email protected]>
|
Thank you very much for the review @vustef |
| parquet_metadata, | ||
| ArrowReaderOptions::default(), | ||
| ) | ||
| pub fn try_new_decoder(parquet_metadata: Arc<ParquetMetaData>) -> Result<Self, ParquetError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function was introduced in
Which has not been release yet -- and thus this is not a breaking API change. Likewise for the changes to ParquetPushDecoderBuilder
Which issue does this PR close?
ParquetRecordBatchStream(async API) in terms of the PushDecoder #8677I am also working on a blog post about this
TODOs
test_cache_projection_excludes_nested_columnsin terms of higher level APIs (refactortest_cache_projection_excludes_nested_columnsto use high level APIs #8754)Rationale for this change
A new ParquetPushDecoder was implemented here
I need to refactor the async and sync readers to use the new push decoder in order to:
What changes are included in this PR?
ParquetRecordBatchStreamto useParquetPushDecoderAre these changes tested?
Yes, by the existing CI tests
I also ran several benchmarks, both in arrow-rs and in DataFusion and I do not see any substantial performance difference (as expected):
Are there any user-facing changes?
No