-
Notifications
You must be signed in to change notification settings - Fork 1k
Implement a Vec<RecordBatch> wrapper for pyarrow.Table convenience
#8790
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Implement a Vec<RecordBatch> wrapper for pyarrow.Table convenience
#8790
Conversation
kylebarron
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Historically the attitude of this crate has been to avoid "Table" constructs to push users towards streaming approaches.
I don't know what the stance of maintainers is towards including a Table construct for python integration.
FWIW if you wanted to look at external crates, PyTable exists that probably does what you want. (disclosure it's my project). That alternatively might give you ideas for how to handle the Table here if you still want to do that. (It's a separate crate for these reasons)
|
Thanks @kylebarron for your very quick review! ❤️
Yes, I'm also not too sure about it, that's why I just sketched out a rough implementation without tests so far. A reason why I think this potentially could be nice to have in
At least I personally think having such a wrapper could be nice, since it simplifies stuff a bit when you anyways already have Slightly nicer Python workflowIn our very specific example, we have a Python class with a function such as this one: class ParquetFile:
def read_row_group(self, index: int) -> pyarrow.RecordBatch: ...In the issue I linked this unfortunately breaks down for a specific parquet file since a particular row group isn't expressable as a single The latter comes with a bit of syntactic shortcomings in contexts where you want to apply rg: pyarrow.RecordBatch | Iterator[pyarrow.RecordBatch] = ParquetFile(...).read_row_group(0)
python_objs: list[dict[str, Any]]
if isinstance(rg, pyarrow.RecordBatch):
python_objs = rg.to_pylist()
else:
python_objs = list(itertools.chain.from_iterable(batch.to_pylist() for batch in rg))With rg: pyarrow.RecordBatch | pyarrow.Table = ParquetFile(...).read_row_group(0)
python_objs: list[dict[str, Any]] = rg.to_pylist()And just for clarity, we unfortunately need to have the entire Row group deserialized as Python objects because our data ingestion pipelines that consume this are expecting to have access to the entire row group in bulk, so streaming approaches are sadly not usable.
Yes, in general, I much prefer the approach of |
I think that's outdated for Python -> Rust. I haven't tried but you should be able to pass a But I assume there's no way today to easily return a
I'm fine with that; and I think other maintainers would probably be fine with that too, since it's only a concept that exists in the Python integration. I'm not sure I totally get your example. Seems bad to be returning a union of multiple types to Python. But seems reasonable to return a
Well there's nothing stopping you from materializing the stream by passing it to
You can use |
Yes, exactly, that's what I even mentioned here in this PR (https://github.com/apache/arrow-rs/pull/8790/files#diff-2cc622072ff5fa80cf1a32a161da31ac058336ebedfeadbc8532fa52ea4224faR491-R492 + https://github.com/apache/arrow-rs/pull/8790/files#diff-2cc622072ff5fa80cf1a32a161da31ac058336ebedfeadbc8532fa52ea4224faR545-R549): This is even used to convert As you said, the opposite, namely easily returning a
My example wasn't entirely complete for simplicitly (and still isn't), it would be more something like this: class ParquetFile:
@overload
def read_row_group(self, index: int, as_table: Literal[True]) -> pyarrow.Table: ...
@overload
def read_row_group(self, index: int, as_table: Literal[False]) -> pyarrow.RecordBatch: ...
def read_row_group(self, index: int, as_table: bool = False) -> pyarrow.RecordBatch | pyarrow.Table: ...The advantage of that would be that both class ToListCapable(Protocol):
def to_pylist(self) -> list[dict[str, Any]]: ...
class ParquetFile:
def read_row_group(self, index: int, as_table: bool = False) -> ToListCapable: ...
&
Yes, sure!. We also do that in other places, or have entirely streamable pipelines elsewhere that use the PyCapsule ArrowStream interface. It's just that for this very specific use case, a |
|
I am not a python expert nor have I fully understood all the discussion on this ticket,
This would be my preferred approach -- make it easy to go from Rust <> Python, while trying to encourage good practices (e.g. streaming). There is no reason to be pedantic and force someone through hoops to make PyTable if that is what they want |
|
I now added a bunch of tests in |
3836afc to
61464b5
Compare
CQ fixes CQ fix CQ fix Let `Table` be a combination of `Vec<RecordBatch>` and `SchemaRef` instead `cargo fmt` Overhauled `Table` definition, Added tests Add empty `Table` integration test Update `arrow-pyarrow`'s crate documentation Overhaul documentation even more Typo fix
61464b5 to
37b46be
Compare
arrow-pyarrow/src/lib.rs
Outdated
| //! For example, a `pyarrow.Table` can be imported to Rust through `PyArrowType<ArrowArrayStreamReader>` | ||
| //! instead (since `pyarrow.Table` implements the ArrayStream PyCapsule interface). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be good to note here that another advantage of using ArrowArrayStreamReader is that it works with tables and stream input out of the box. It doesn't matter which type the user passes in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also reading through the docs again, I'd suggest making a reference to Box<dyn RecordBatchReader> rather than ArrowArrayStreamReader. The former is a higher level API and much easier to use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be good to note here that another advantage of using ArrowArrayStreamReader is that it works with tables and stream input out of the box.
I added that in the docs.
Also reading through the docs again, I'd suggest making a reference to Box rather than ArrowArrayStreamReader. The former is a higher level API and much easier to use.
I'm not exactly sure what you mean here. Box<dyn RecordBatchReader> only implements IntoPyArrow, but not FromPyArrow. So in the example I state in the new documentation, that for consuming a pyarrow.Table in Rust, also a streaming approach could be used, the Box<dyn RecordBatchReader> isn't helping sadly. One has to use ArrowArrayStreamReader, since that properly implements FromPyArrow.
|
Hey @kylebarron, thanks for the review! I implemented everything as you suggested. As you can see, now the CI is broken, because of a suttle problem that was uncovered. Maybe you can help me, as I'm not too familiar with the FFI logic and all my sanity checks did not help me: In the The The schema itself from the This previously worked because I used an Sanity checks:
EDIT: More importantly, omitting the schema check in |
|
@kylebarron for now I stole the |
| /// TODO: Either remove this check, replace it with something already existing in `arrow-rs` | ||
| /// or move it to a central `utils` location. | ||
| fn schema_equals(left: &SchemaRef, right: &SchemaRef) -> bool { | ||
| left.fields |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This impl seems incorrect - the zip() operation does not check that the iterators have the same number of items. It actually checks that left is a subset of right or right is a subset of left. So, if right has one field more than left this function will still return true.
https://play.rust-lang.org/?version=stable&mode=debug&edition=2024&gist=95c113900129b392365cdfb3b4c2b4e6
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In principle, instead of using this schema check method at all, I'd much rather have the underlying issue solved by understanding why either the ArrowStreamReader PyCapsule interface of pyarrow.Table or the Rust Box<dyn RecordBatchReader> part seems to swallow up RecordBatch metadata.
If that issue would be fixed, then this function can be left out again and a normal schema == recordbatch.schema() test could be used. This function would only have relevance if it's expected that the RecordBatch coming from a stream reader somehow doesn't have metadata anymore.
But in general your comment would be also relevant for @kylebarron as he is using this function as-is in his crate pyo3-arrow.
| for record_batch in &record_batches { | ||
| if !schema_equals(&schema, &record_batch.schema()) { | ||
| return Err(ArrowError::SchemaError( | ||
| //"All record batches must have the same schema.".to_owned(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| //"All record batches must have the same schema.".to_owned(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I only have the more verbose error message here right now to understand what's going on in the schema mismatch. This is currently commented out to signal that this is not intended to be merged as-is, but the schema mismatch issue shall be understood first.
In general I'm opinionless about how verbose the error message shall be, I'd happily to eventually remove whatever variant you dislike.
Co-authored-by: Martin Grigorov <[email protected]>

Rationale for this change
When dealing with Parquet files that have an exceedingly large amount of Binary or UTF8 data in one row group, there can be issues when returning a single RecordBatch because of index overflows (#7973).
In
pyarrowthis is usually solved by representing data as apyarrow.Tableobject whose columns areChunkedArrays, which basically are just lists of Arrow Arrays, or alternatively, thepyarrow.Tableis just a representation of a list ofRecordBatches.I'd like to build a function in PyO3 that returns a
pyarrow.Table, very similar to pyarrow's read_row_group method. With that, we could have feature parity withpyarrowin circumstances of potential index overflows without resorting to type changes (such as reading the data asLargeStringorStringViewcolumns).Currently, AFAIS, there is no way in
arrow-pyarrowto export apyarrow.Tabledirectly. Especially convenience methods fromVec<RecordBatch>seem to be missing. This PR tries to implement a convenience wrapper that allows directly exportingpyarrow.Table.What changes are included in this PR?
A new struct
Tablein the cratearrow-pyarrowis added which can be constructed fromVec<RecordBatch>or fromArrowArrayStreamReader.It implements
FromPyArrowandIntoPyArrow.FromPyArrowwill support anything that either implements the ArrowStreamReader protocol or is a RecordBatchReader, or has ato_reader()method which does that.pyarrow.Tabledoes both of these things.IntoPyArrowwill result int apyarrow.Tableon the Python side, constructed throughpyarrow.Table.from_batches(...).Are these changes tested?
Yes, in
arrow-pyarrow-integration-tests.Are there any user-facing changes?
A new
Tableconvience wrapper is added!