Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 7 additions & 115 deletions datafusion/proto/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,121 +17,13 @@
under the License.
-->

# Apache Arrow DataFusion Proto
# `datafusion-proto`: Apache Arrow DataFusion Proto Serialization / Deserialization

Apache Arrow [DataFusion][df] is an extensible query execution framework,
written in Rust, that uses Apache Arrow as its in-memory format.
This crate contains code to convert Apache Arrow [DataFusion] plans to and from
bytes, which can be useful for sending plans over the network, for example
when building a distributed query engine.

This crate provides support format for serializing and deserializing the
following structures to and from bytes:
See [API Docs] for details and examples.

1. [`LogicalPlan`]'s (including [`Expr`]),
2. [`ExecutionPlan`]s (including [`PhysiscalExpr`])

This format can be useful for sending plans over the network, for example when
building a distributed query engine.

Internally, this crate is implemented by converting the plans to [protocol
buffers] using [prost].

[protocol buffers]: https://developers.google.com/protocol-buffers
[`logicalplan`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.LogicalPlan.html
[`expr`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/expr/enum.Expr.html
[`executionplan`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html
[`physiscalexpr`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/trait.PhysicalExpr.html
[prost]: https://docs.rs/prost/latest/prost/

## See Also

The binary format created by this crate supports the full range of DataFusion
plans, but is DataFusion specific. See [datafusion-substrait] which can encode
many DataFusion plans using the [substrait.io] standard.

[datafusion-substrait]: https://docs.rs/datafusion-substrait/latest/datafusion_substrait
[substrait.io]: https://substrait.io

# Examples

## Serializing Expressions

Based on [examples/expr_serde.rs](examples/expr_serde.rs)

```rust
use datafusion_common::Result;
use datafusion_expr::{col, lit, Expr};
use datafusion_proto::bytes::Serializeable;

fn main() -> Result<()> {
// Create a new `Expr` a < 32
let expr = col("a").lt(lit(5i32));

// Convert it to an opaque form
let bytes = expr.to_bytes()?;

// Decode bytes from somewhere (over network, etc.)
let decoded_expr = Expr::from_bytes(&bytes)?;
assert_eq!(expr, decoded_expr);
Ok(())
}
```

## Serializing Logical Plans

Based on [examples/logical_plan_serde.rs](examples/logical_plan_serde.rs)

```rust
use datafusion::prelude::*;
use datafusion_common::Result;
use datafusion_proto::bytes::{logical_plan_from_bytes, logical_plan_to_bytes};

#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
ctx.register_csv("t1", "tests/testdata/test.csv", CsvReadOptions::default())
.await
?;
let plan = ctx.table("t1").await?.into_optimized_plan()?;
let bytes = logical_plan_to_bytes(&plan)?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip));
Ok(())
}
```

## Serializing Physical Plans

Based on [examples/physical_plan_serde.rs](examples/physical_plan_serde.rs)

```rust
use datafusion::prelude::*;
use datafusion_common::Result;
use datafusion_proto::bytes::{physical_plan_from_bytes,physical_plan_to_bytes};

#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
ctx.register_csv("t1", "tests/testdata/test.csv", CsvReadOptions::default())
.await
?;
let logical_plan = ctx.table("t1").await?.into_optimized_plan()?;
let physical_plan = ctx.create_physical_plan(&logical_plan).await?;
let bytes = physical_plan_to_bytes(physical_plan.clone())?;
let physical_round_trip = physical_plan_from_bytes(&bytes, &ctx)?;
assert_eq!(format!("{:?}", physical_plan), format!("{:?}", physical_round_trip));
Ok(())
}

```

## Generated Code

The prost/tonic code can be generated by running, which in turn invokes the Rust binary located in [gen](./gen)

This is necessary after modifying the protobuf definitions or altering the dependencies of [gen](./gen), and requires a
valid installation of [protoc](https://github.com/protocolbuffers/protobuf#protocol-compiler-installation).

```bash
./regen.sh
```

[df]: https://crates.io/crates/datafusion
[datafusion]: https://arrow.apache.org/datafusion
[api docs]: http://docs.rs/datafusion-substrait/latest/datafusion-substrait
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be datafusion-proto?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, excellent catch. Fixed in f596397

29 changes: 29 additions & 0 deletions datafusion/proto/REAME-dev.md
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I missed this before. Filed #9300

Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

## Generated Code

The prost/tonic code can be generated by running, which in turn invokes the Rust binary located in [gen](./gen)

This is necessary after modifying the protobuf definitions or altering the dependencies of [gen](./gen), and requires a
valid installation of [protoc](https://github.com/protocolbuffers/protobuf#protocol-compiler-installation).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Link to the DataFusion doc about this for consistency: https://arrow.apache.org/datafusion/contributor-guide/#protoc-installation

Or better to have doc direct from source?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a link to the rendered docs makes sense -- updated in aa09db6. Thanks for the suggestion


```bash
./regen.sh
```
33 changes: 0 additions & 33 deletions datafusion/proto/examples/expr_serde.rs

This file was deleted.

32 changes: 0 additions & 32 deletions datafusion/proto/examples/logical_plan_serde.rs

This file was deleted.

36 changes: 0 additions & 36 deletions datafusion/proto/examples/physical_plan_serde.rs

This file was deleted.

92 changes: 90 additions & 2 deletions datafusion/proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,96 @@
// specific language governing permissions and limitations
// under the License.

//! Serde code for logical plans and expressions.
//! Serialize / Deserialize DataFusion Plans to bytes
//!
//! This crate provides support for serializing and deserializing the
//! following structures to and from bytes:
//!
//! 1. [`LogicalPlan`]'s (including [`Expr`]),
//! 2. [`ExecutionPlan`]s (including [`PhysicalExpr`])
//!
//! [`LogicalPlan`]: datafusion_expr::LogicalPlan
//! [`Expr`]: datafusion_expr::Expr
//! [`ExecutionPlan`]: datafusion::physical_plan::ExecutionPlan
//! [`PhysicalExpr`]: datafusion::physical_expr::PhysicalExpr
//!
//! Internally, this crate is implemented by converting the plans to [protocol
//! buffers] using [prost].
//!
//! [protocol buffers]: https://developers.google.com/protocol-buffers
//! [prost]: https://docs.rs/prost/latest/prost/
//!
//! # See Also
//!
//! The binary format created by this crate supports the full range of DataFusion
//! plans, but is DataFusion specific. See [datafusion-substrait] for a crate
//! which can encode many DataFusion plans using the [substrait.io] standard.
Comment on lines +45 to +47
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since format is DataFusion specific, are we able to give any info on version compatibility here, etc.? If that has been decided yet 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call -- I think the current state of affairs is that there is no gaurantee about version compatibility, and I added that in 7d91abf. If version compatibility is a desired feature I think what is most important would be a test harness for verifying it (which is a non trivial undertaking)

//!
//! [datafusion-substrait]: https://docs.rs/datafusion-substrait/latest/datafusion_substrait
//! [substrait.io]: https://substrait.io
//!
//! # Example: Serializing [`Expr`]s
//! ```
//! # use datafusion_common::Result;
//! # use datafusion_expr::{col, lit, Expr};
//! # use datafusion_proto::bytes::Serializeable;
//! # fn main() -> Result<()>{
//! // Create a new `Expr` a < 32
//! let expr = col("a").lt(lit(5i32));
//!
//! // Convert it to bytes (for sending over the network, etc.)
//! let bytes = expr.to_bytes()?;
//!
//! // Decode bytes from somewhere (over network, etc.) back to Expr
//! let decoded_expr = Expr::from_bytes(&bytes)?;
//! assert_eq!(expr, decoded_expr);
//! # Ok(())
//! # }
//! ```
//!
//! # Example: Serializing [`LogicalPlan`]s
//! ```
//! # use datafusion::prelude::*;
//! # use datafusion_common::Result;
//! # use datafusion_proto::bytes::{logical_plan_from_bytes, logical_plan_to_bytes};
//! # #[tokio::main]
//! # async fn main() -> Result<()>{
//! // Create a plan that scans table 't'
//! let ctx = SessionContext::new();
//! ctx.register_csv("t1", "tests/testdata/test.csv", CsvReadOptions::default()).await?;
//! let plan = ctx.table("t1").await?.into_optimized_plan()?;
//!
//! // Convert the plan into bytes (for sending over the network, etc.)
//! let bytes = logical_plan_to_bytes(&plan)?;
//!
//! // Decode bytes from somewhere (over network, etc.) back to LogicalPlan
//! let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
//! assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip));
//! # Ok(())
//! # }
//! ```
//! # Example: Serializing [`ExecutionPlan`]s
//!
//! ```
//! # use datafusion::prelude::*;
//! # use datafusion_common::Result;
//! # use datafusion_proto::bytes::{physical_plan_from_bytes,physical_plan_to_bytes};
//! # #[tokio::main]
//! # async fn main() -> Result<()>{
//! // Create a plan that scans table 't'
//! let ctx = SessionContext::new();
//! ctx.register_csv("t1", "tests/testdata/test.csv", CsvReadOptions::default()).await?;
//! let physical_plan = ctx.table("t1").await?.create_physical_plan().await?;
//!
//! // Convert the plan into bytes (for sending over the network, etc.)
//! let bytes = physical_plan_to_bytes(physical_plan.clone())?;
//!
//! // Decode bytes from somewhere (over network, etc.) back to ExecutionPlan
//! let physical_round_trip = physical_plan_from_bytes(&bytes, &ctx)?;
//! assert_eq!(format!("{:?}", physical_plan), format!("{:?}", physical_round_trip));
//! # Ok(())
//! # }
//! ```
pub mod bytes;
pub mod common;
pub mod generated;
Expand Down
18 changes: 5 additions & 13 deletions datafusion/substrait/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,10 @@
under the License.
-->

# DataFusion + Substrait
# Apache Arrow DataFusion Substrait

[Substrait](https://substrait.io/) provides a cross-language serialization format for relational algebra, based on
protocol buffers.
This crate contains a [Substrait] producer and consumer for Apache Arrow
[DataFusion] plans. See [API Docs] for details and examples.

This repository provides a Substrait producer and consumer for DataFusion:

- The producer converts a DataFusion logical plan into a Substrait protobuf.
- The consumer converts a Substrait protobuf into a DataFusion logical plan.

Potential uses of this crate:

- Replace the current [DataFusion protobuf definition](https://github.com/apache/arrow-datafusion/blob/main/datafusion/proto/proto/datafusion.proto) used in Ballista for passing query plan fragments to executors
- Make it easier to pass query plans over FFI boundaries, such as from Python to Rust
- Allow Apache Calcite query plans to be executed in DataFusion
[substrait]: https://substrait.io
[api docs]: https://docs.rs/datafusion-substrait/latest/datafusion_substrait
Loading