diff --git a/datafusion/proto/README-dev.md b/datafusion/proto/README-dev.md new file mode 100644 index 000000000000..b793b47e76a6 --- /dev/null +++ b/datafusion/proto/README-dev.md @@ -0,0 +1,32 @@ + + +## Generated Code + +The prost/tonic code can be generated by running `./regen.sh`, which in turn invokes the Rust binary located in [gen](./gen) + +This is necessary after modifying the protobuf definitions or altering the dependencies of [gen](./gen), and requires a +valid installation of [protoc] (see [installation instructions] for details). + +```bash +./regen.sh +``` + +[protoc]: https://github.com/protocolbuffers/protobuf#protocol-compiler-installation +[installation instructions]: https://arrow.apache.org/datafusion/contributor-guide/#protoc-installation diff --git a/datafusion/proto/README.md b/datafusion/proto/README.md index 8d25f193fa6b..ca6ae7fc68f4 100644 --- a/datafusion/proto/README.md +++ b/datafusion/proto/README.md @@ -17,121 +17,13 @@ under the License. --> -# Apache Arrow DataFusion Proto +# `datafusion-proto`: Apache Arrow DataFusion Protobuf Serialization / Deserialization -Apache Arrow [DataFusion][df] is an extensible query execution framework, -written in Rust, that uses Apache Arrow as its in-memory format. +This crate contains code to convert Apache Arrow [DataFusion] plans to and from +bytes, which can be useful for sending plans over the network, for example +when building a distributed query engine. -This crate provides support format for serializing and deserializing the -following structures to and from bytes: +See [API Docs] for details and examples. -1. [`LogicalPlan`]'s (including [`Expr`]), -2. [`ExecutionPlan`]s (including [`PhysiscalExpr`]) - -This format can be useful for sending plans over the network, for example when -building a distributed query engine. - -Internally, this crate is implemented by converting the plans to [protocol -buffers] using [prost]. - -[protocol buffers]: https://developers.google.com/protocol-buffers -[`logicalplan`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.LogicalPlan.html -[`expr`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/expr/enum.Expr.html -[`executionplan`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html -[`physiscalexpr`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/trait.PhysicalExpr.html -[prost]: https://docs.rs/prost/latest/prost/ - -## See Also - -The binary format created by this crate supports the full range of DataFusion -plans, but is DataFusion specific. See [datafusion-substrait] which can encode -many DataFusion plans using the [substrait.io] standard. - -[datafusion-substrait]: https://docs.rs/datafusion-substrait/latest/datafusion_substrait -[substrait.io]: https://substrait.io - -# Examples - -## Serializing Expressions - -Based on [examples/expr_serde.rs](examples/expr_serde.rs) - -```rust -use datafusion_common::Result; -use datafusion_expr::{col, lit, Expr}; -use datafusion_proto::bytes::Serializeable; - -fn main() -> Result<()> { - // Create a new `Expr` a < 32 - let expr = col("a").lt(lit(5i32)); - - // Convert it to an opaque form - let bytes = expr.to_bytes()?; - - // Decode bytes from somewhere (over network, etc.) - let decoded_expr = Expr::from_bytes(&bytes)?; - assert_eq!(expr, decoded_expr); - Ok(()) -} -``` - -## Serializing Logical Plans - -Based on [examples/logical_plan_serde.rs](examples/logical_plan_serde.rs) - -```rust -use datafusion::prelude::*; -use datafusion_common::Result; -use datafusion_proto::bytes::{logical_plan_from_bytes, logical_plan_to_bytes}; - -#[tokio::main] -async fn main() -> Result<()> { - let ctx = SessionContext::new(); - ctx.register_csv("t1", "tests/testdata/test.csv", CsvReadOptions::default()) - .await - ?; - let plan = ctx.table("t1").await?.into_optimized_plan()?; - let bytes = logical_plan_to_bytes(&plan)?; - let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; - assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip)); - Ok(()) -} -``` - -## Serializing Physical Plans - -Based on [examples/physical_plan_serde.rs](examples/physical_plan_serde.rs) - -```rust -use datafusion::prelude::*; -use datafusion_common::Result; -use datafusion_proto::bytes::{physical_plan_from_bytes,physical_plan_to_bytes}; - -#[tokio::main] -async fn main() -> Result<()> { - let ctx = SessionContext::new(); - ctx.register_csv("t1", "tests/testdata/test.csv", CsvReadOptions::default()) - .await - ?; - let logical_plan = ctx.table("t1").await?.into_optimized_plan()?; - let physical_plan = ctx.create_physical_plan(&logical_plan).await?; - let bytes = physical_plan_to_bytes(physical_plan.clone())?; - let physical_round_trip = physical_plan_from_bytes(&bytes, &ctx)?; - assert_eq!(format!("{:?}", physical_plan), format!("{:?}", physical_round_trip)); - Ok(()) -} - -``` - -## Generated Code - -The prost/tonic code can be generated by running, which in turn invokes the Rust binary located in [gen](./gen) - -This is necessary after modifying the protobuf definitions or altering the dependencies of [gen](./gen), and requires a -valid installation of [protoc](https://github.com/protocolbuffers/protobuf#protocol-compiler-installation). - -```bash -./regen.sh -``` - -[df]: https://crates.io/crates/datafusion +[datafusion]: https://arrow.apache.org/datafusion +[api docs]: http://docs.rs/datafusion-proto/latest diff --git a/datafusion/proto/examples/expr_serde.rs b/datafusion/proto/examples/expr_serde.rs deleted file mode 100644 index 9da64f87e2b1..000000000000 --- a/datafusion/proto/examples/expr_serde.rs +++ /dev/null @@ -1,33 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use datafusion_common::Result; -use datafusion_expr::{col, lit, Expr}; -use datafusion_proto::bytes::Serializeable; - -fn main() -> Result<()> { - // Create a new `Expr` a < 32 - let expr = col("a").lt(lit(5i32)); - - // Convert it to an opaque form - let bytes = expr.to_bytes()?; - - // Decode bytes from somewhere (over network, etc.) - let decoded_expr = Expr::from_bytes(&bytes)?; - assert_eq!(expr, decoded_expr); - Ok(()) -} diff --git a/datafusion/proto/examples/logical_plan_serde.rs b/datafusion/proto/examples/logical_plan_serde.rs deleted file mode 100644 index 9f468638c150..000000000000 --- a/datafusion/proto/examples/logical_plan_serde.rs +++ /dev/null @@ -1,32 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use datafusion::prelude::*; -use datafusion_common::Result; -use datafusion_proto::bytes::{logical_plan_from_bytes, logical_plan_to_bytes}; - -#[tokio::main] -async fn main() -> Result<()> { - let ctx = SessionContext::new(); - ctx.register_csv("t1", "testdata/test.csv", CsvReadOptions::default()) - .await?; - let plan = ctx.table("t1").await?.into_optimized_plan()?; - let bytes = logical_plan_to_bytes(&plan)?; - let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; - assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}")); - Ok(()) -} diff --git a/datafusion/proto/examples/physical_plan_serde.rs b/datafusion/proto/examples/physical_plan_serde.rs deleted file mode 100644 index 72e216074a16..000000000000 --- a/datafusion/proto/examples/physical_plan_serde.rs +++ /dev/null @@ -1,36 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use datafusion::prelude::*; -use datafusion_common::Result; -use datafusion_proto::bytes::{physical_plan_from_bytes, physical_plan_to_bytes}; - -#[tokio::main] -async fn main() -> Result<()> { - let ctx = SessionContext::new(); - ctx.register_csv("t1", "testdata/test.csv", CsvReadOptions::default()) - .await?; - let dataframe = ctx.table("t1").await?; - let physical_plan = dataframe.create_physical_plan().await?; - let bytes = physical_plan_to_bytes(physical_plan.clone())?; - let physical_round_trip = physical_plan_from_bytes(&bytes, &ctx)?; - assert_eq!( - format!("{physical_plan:?}"), - format!("{physical_round_trip:?}") - ); - Ok(()) -} diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs index 872c26408ebe..5d60b9b57454 100644 --- a/datafusion/proto/src/lib.rs +++ b/datafusion/proto/src/lib.rs @@ -15,8 +15,102 @@ // specific language governing permissions and limitations // under the License. -//! Serde code for logical plans and expressions. - +//! Serialize / Deserialize DataFusion Plans to bytes +//! +//! This crate provides support for serializing and deserializing the +//! following structures to and from bytes: +//! +//! 1. [`LogicalPlan`]'s (including [`Expr`]), +//! 2. [`ExecutionPlan`]s (including [`PhysicalExpr`]) +//! +//! [`LogicalPlan`]: datafusion_expr::LogicalPlan +//! [`Expr`]: datafusion_expr::Expr +//! [`ExecutionPlan`]: datafusion::physical_plan::ExecutionPlan +//! [`PhysicalExpr`]: datafusion::physical_expr::PhysicalExpr +//! +//! Internally, this crate is implemented by converting the plans to [protocol +//! buffers] using [prost]. +//! +//! [protocol buffers]: https://developers.google.com/protocol-buffers +//! [prost]: https://docs.rs/prost/latest/prost/ +//! +//! # Version Compatibility +//! +//! The serialized form are not guaranteed to be compatible across +//! DataFusion versions. A plan serialized with one version of DataFusion +//! may not be able to deserialized with a different version. +//! +//! # See Also +//! +//! The binary format created by this crate supports the full range of DataFusion +//! plans, but is DataFusion specific. See [datafusion-substrait] for a crate +//! which can encode many DataFusion plans using the [substrait.io] standard. +//! +//! [datafusion-substrait]: https://docs.rs/datafusion-substrait/latest/datafusion_substrait +//! [substrait.io]: https://substrait.io +//! +//! # Example: Serializing [`Expr`]s +//! ``` +//! # use datafusion_common::Result; +//! # use datafusion_expr::{col, lit, Expr}; +//! # use datafusion_proto::bytes::Serializeable; +//! # fn main() -> Result<()>{ +//! // Create a new `Expr` a < 32 +//! let expr = col("a").lt(lit(5i32)); +//! +//! // Convert it to bytes (for sending over the network, etc.) +//! let bytes = expr.to_bytes()?; +//! +//! // Decode bytes from somewhere (over network, etc.) back to Expr +//! let decoded_expr = Expr::from_bytes(&bytes)?; +//! assert_eq!(expr, decoded_expr); +//! # Ok(()) +//! # } +//! ``` +//! +//! # Example: Serializing [`LogicalPlan`]s +//! ``` +//! # use datafusion::prelude::*; +//! # use datafusion_common::Result; +//! # use datafusion_proto::bytes::{logical_plan_from_bytes, logical_plan_to_bytes}; +//! # #[tokio::main] +//! # async fn main() -> Result<()>{ +//! // Create a plan that scans table 't' +//! let ctx = SessionContext::new(); +//! ctx.register_csv("t1", "tests/testdata/test.csv", CsvReadOptions::default()).await?; +//! let plan = ctx.table("t1").await?.into_optimized_plan()?; +//! +//! // Convert the plan into bytes (for sending over the network, etc.) +//! let bytes = logical_plan_to_bytes(&plan)?; +//! +//! // Decode bytes from somewhere (over network, etc.) back to LogicalPlan +//! let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; +//! assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip)); +//! # Ok(()) +//! # } +//! ``` +//! # Example: Serializing [`ExecutionPlan`]s +//! +//! ``` +//! # use datafusion::prelude::*; +//! # use datafusion_common::Result; +//! # use datafusion_proto::bytes::{physical_plan_from_bytes,physical_plan_to_bytes}; +//! # #[tokio::main] +//! # async fn main() -> Result<()>{ +//! // Create a plan that scans table 't' +//! let ctx = SessionContext::new(); +//! ctx.register_csv("t1", "tests/testdata/test.csv", CsvReadOptions::default()).await?; +//! let physical_plan = ctx.table("t1").await?.create_physical_plan().await?; +//! +//! // Convert the plan into bytes (for sending over the network, etc.) +//! let bytes = physical_plan_to_bytes(physical_plan.clone())?; +//! +//! // Decode bytes from somewhere (over network, etc.) back to ExecutionPlan +//! let physical_round_trip = physical_plan_from_bytes(&bytes, &ctx)?; +//! assert_eq!(format!("{:?}", physical_plan), format!("{:?}", physical_round_trip)); +//! # Ok(()) +//! # } +//! ``` pub mod bytes; pub mod common; pub mod generated; diff --git a/datafusion/substrait/README.md b/datafusion/substrait/README.md index 17591cdd6232..a9f2ba4c3c52 100644 --- a/datafusion/substrait/README.md +++ b/datafusion/substrait/README.md @@ -17,18 +17,10 @@ under the License. --> -# DataFusion + Substrait +# Apache Arrow DataFusion Substrait -[Substrait](https://substrait.io/) provides a cross-language serialization format for relational algebra, based on -protocol buffers. +This crate contains a [Substrait] producer and consumer for Apache Arrow +[DataFusion] plans. See [API Docs] for details and examples. -This repository provides a Substrait producer and consumer for DataFusion: - -- The producer converts a DataFusion logical plan into a Substrait protobuf. -- The consumer converts a Substrait protobuf into a DataFusion logical plan. - -Potential uses of this crate: - -- Replace the current [DataFusion protobuf definition](https://github.com/apache/arrow-datafusion/blob/main/datafusion/proto/proto/datafusion.proto) used in Ballista for passing query plan fragments to executors -- Make it easier to pass query plans over FFI boundaries, such as from Python to Rust -- Allow Apache Calcite query plans to be executed in DataFusion +[substrait]: https://substrait.io +[api docs]: https://docs.rs/datafusion-substrait/latest diff --git a/datafusion/substrait/src/lib.rs b/datafusion/substrait/src/lib.rs index 432553ec7903..454f0e7b7cb9 100644 --- a/datafusion/substrait/src/lib.rs +++ b/datafusion/substrait/src/lib.rs @@ -15,6 +15,63 @@ // specific language governing permissions and limitations // under the License. +//! Serialize / Deserialize DataFusion Plans to [Substrait.io] +//! +//! This crate provides support for serializing and deserializing both DataFusion +//! [`LogicalPlan`] and [`ExecutionPlan`] to and from the generated types in +//! [substrait::proto] from the [substrait] crate. +//! +//! [Substrait.io] provides a cross-language serialization format for relational +//! algebra (e.g. query plans and expressions), based on protocol buffers. +//! +//! [Substrait.io]: https://substrait.io/ +//! +//! [`LogicalPlan`]: datafusion::logical_expr::LogicalPlan +//! [`ExecutionPlan`]: datafusion::physical_plan::ExecutionPlan +//! +//! Potential uses of this crate: +//! * Use DataFusion to run Substrait plans created by other systems (e.g. Apache Calcite) +//! * Use DataFusion to create plans to run on other systems +//! * Pass query plans over FFI boundaries, such as from Python to Rust +//! * Pass query plans across node boundaries +//! +//! # See Also +//! +//! Substrait does not (yet) support the full range of plans and expressions +//! that DataFusion offers. See the [datafusion-proto] crate for a DataFusion +//! specific format that does support of the full range. +//! +//! [datafusion-proto]: https://docs.rs/datafusion-proto/latest/datafusion_proto +//! +//! Note that generated types such as [`substrait::proto::Plan`] and +//! [`substrait::proto::Rel`] can be serialized / deserialized to bytes, JSON and +//! other formats using [prost] and the rest of the Rust protobuf ecosystem. +//! +//! # Example: Serializing [`LogicalPlan`]s +//! ``` +//! # use datafusion::prelude::*; +//! # use datafusion::error::Result; +//! # #[tokio::main(flavor = "current_thread")] +//! # async fn main() -> Result<()>{ +//! # use std::sync::Arc; +//! # use datafusion::arrow::array::{Int32Array, RecordBatch}; +//! # use datafusion_substrait::logical_plan; +//! // Create a plan that scans table 't' +//! let ctx = SessionContext::new(); +//! let batch = RecordBatch::try_from_iter(vec![("x", Arc::new(Int32Array::from(vec![42])) as _)])?; +//! ctx.register_batch("t", batch)?; +//! let df = ctx.sql("SELECT x from t").await?; +//! let plan = df.into_optimized_plan()?; +//! +//! // Convert the plan into a substrait (protobuf) Plan +//! let substrait_plan = logical_plan::producer::to_substrait_plan(&plan, &ctx)?; +//! +//! // Receive a substrait protobuf from somewhere, and turn it into a LogicalPlan +//! let logical_round_trip = logical_plan::consumer::from_substrait_plan(&ctx, &substrait_plan).await?; +//! assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip)); +//! # Ok(()) +//! # } +//! ``` pub mod logical_plan; pub mod physical_plan; pub mod serializer;