diff --git a/datafusion/substrait/Cargo.toml b/datafusion/substrait/Cargo.toml index 1029b4c8555c..4a288f819ecf 100644 --- a/datafusion/substrait/Cargo.toml +++ b/datafusion/substrait/Cargo.toml @@ -25,8 +25,10 @@ rust-version = "1.62" [dependencies] async-recursion = "1.0" +chrono = "0.4.23" datafusion = { version = "17.0.0", path = "../core" } itertools = "0.10.5" +object_store = "0.5.3" prost = "0.11" prost-types = "0.11" substrait = "0.4" diff --git a/datafusion/substrait/src/lib.rs b/datafusion/substrait/src/lib.rs index 53d0f874633e..34c9c46edf0c 100644 --- a/datafusion/substrait/src/lib.rs +++ b/datafusion/substrait/src/lib.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -pub mod consumer; -pub mod producer; +pub mod logical_plan; +pub mod physical_plan; pub mod serializer; // Re-export substrait crate diff --git a/datafusion/substrait/src/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs similarity index 100% rename from datafusion/substrait/src/consumer.rs rename to datafusion/substrait/src/logical_plan/consumer.rs diff --git a/datafusion/substrait/src/logical_plan/mod.rs b/datafusion/substrait/src/logical_plan/mod.rs new file mode 100644 index 000000000000..6f8b8e493f52 --- /dev/null +++ b/datafusion/substrait/src/logical_plan/mod.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod consumer; +pub mod producer; diff --git a/datafusion/substrait/src/producer.rs b/datafusion/substrait/src/logical_plan/producer.rs similarity index 100% rename from datafusion/substrait/src/producer.rs rename to datafusion/substrait/src/logical_plan/producer.rs diff --git a/datafusion/substrait/src/physical_plan/consumer.rs b/datafusion/substrait/src/physical_plan/consumer.rs new file mode 100644 index 000000000000..0c65beb4e1e5 --- /dev/null +++ b/datafusion/substrait/src/physical_plan/consumer.rs @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_recursion::async_recursion; +use chrono::DateTime; +use datafusion::arrow::datatypes::Schema; +use datafusion::datasource::listing::PartitionedFile; +use datafusion::datasource::object_store::ObjectStoreUrl; +use datafusion::error::{DataFusionError, Result}; +use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec}; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::SessionContext; +use object_store::ObjectMeta; +use std::collections::HashMap; +use std::sync::Arc; +use substrait::proto::read_rel::local_files::file_or_files::PathType; +use substrait::proto::{ + expression::MaskExpression, read_rel::ReadType, rel::RelType, Rel, +}; + +/// Convert Substrait Rel to DataFusion ExecutionPlan +#[async_recursion] +pub async fn from_substrait_rel( + _ctx: &mut SessionContext, + rel: &Rel, + _extensions: &HashMap, +) -> Result> { + match &rel.rel_type { + Some(RelType::Read(read)) => { + if read.filter.is_some() || read.best_effort_filter.is_some() { + return Err(DataFusionError::NotImplemented( + "Read with filter is not supported".to_string(), + )); + } + if read.base_schema.is_some() { + return Err(DataFusionError::NotImplemented( + "Read with schema is not supported".to_string(), + )); + } + if read.advanced_extension.is_some() { + return Err(DataFusionError::NotImplemented( + "Read with AdvancedExtension is not supported".to_string(), + )); + } + match &read.as_ref().read_type { + Some(ReadType::LocalFiles(files)) => { + let mut file_groups = vec![]; + + for file in &files.items { + let path = if let Some(path_type) = &file.path_type { + match path_type { + PathType::UriPath(path) => Ok(path.clone()), + PathType::UriPathGlob(path) => Ok(path.clone()), + PathType::UriFile(path) => Ok(path.clone()), + PathType::UriFolder(path) => Ok(path.clone()), + } + } else { + Err(DataFusionError::Substrait( + "Missing PathType".to_string(), + )) + }?; + + // TODO substrait plans do not have `last_modified` or `size` but `ObjectMeta` + // requires them both - perhaps we can change the object-store crate + // to make these optional? We cannot guarantee that we have access to the + // files to get this information, depending on how this library is being + // used + let last_modified = DateTime::parse_from_str( + "1970 Jan 1 00:00:00.000 +0000", + "%Y %b %d %H:%M:%S%.3f %z", + ) + .unwrap(); + let size = 0; + + let partitioned_file = PartitionedFile { + object_meta: ObjectMeta { + last_modified: last_modified.into(), + location: path.into(), + size, + }, + partition_values: vec![], + range: None, + extensions: None, + }; + + let part_index = file.partition_index as usize; + while part_index >= file_groups.len() { + file_groups.push(vec![]); + } + file_groups[part_index].push(partitioned_file) + } + + let mut base_config = FileScanConfig { + object_store_url: ObjectStoreUrl::local_filesystem(), + file_schema: Arc::new(Schema::empty()), + file_groups, + statistics: Default::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: None, + infinite_source: false, + }; + + if let Some(MaskExpression { select, .. }) = &read.projection { + if let Some(projection) = &select.as_ref() { + let column_indices: Vec = projection + .struct_items + .iter() + .map(|item| item.field as usize) + .collect(); + base_config.projection = Some(column_indices); + } + } + + Ok(Arc::new(ParquetExec::new(base_config, None, None)) + as Arc) + } + _ => Err(DataFusionError::NotImplemented( + "Only LocalFile reads are supported when parsing physical" + .to_string(), + )), + } + } + _ => Err(DataFusionError::NotImplemented(format!( + "Unsupported RelType: {:?}", + rel.rel_type + ))), + } +} diff --git a/datafusion/substrait/src/physical_plan/mod.rs b/datafusion/substrait/src/physical_plan/mod.rs new file mode 100644 index 000000000000..6f8b8e493f52 --- /dev/null +++ b/datafusion/substrait/src/physical_plan/mod.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod consumer; +pub mod producer; diff --git a/datafusion/substrait/src/physical_plan/producer.rs b/datafusion/substrait/src/physical_plan/producer.rs new file mode 100644 index 000000000000..c8d739ecdad5 --- /dev/null +++ b/datafusion/substrait/src/physical_plan/producer.rs @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::error::{DataFusionError, Result}; +use datafusion::physical_plan::file_format::ParquetExec; +use datafusion::physical_plan::{displayable, ExecutionPlan}; +use std::collections::HashMap; +use substrait::proto::expression::MaskExpression; +use substrait::proto::extensions; +use substrait::proto::read_rel::local_files::file_or_files::ParquetReadOptions; +use substrait::proto::read_rel::local_files::file_or_files::{FileFormat, PathType}; +use substrait::proto::read_rel::local_files::FileOrFiles; +use substrait::proto::read_rel::LocalFiles; +use substrait::proto::read_rel::ReadType; +use substrait::proto::rel::RelType; +use substrait::proto::ReadRel; +use substrait::proto::Rel; + +/// Convert DataFusion ExecutionPlan to Substrait Rel +pub fn to_substrait_rel( + plan: &dyn ExecutionPlan, + _extension_info: &mut ( + Vec, + HashMap, + ), +) -> Result> { + if let Some(scan) = plan.as_any().downcast_ref::() { + let base_config = scan.base_config(); + let mut substrait_files = vec![]; + for (partition_index, files) in base_config.file_groups.iter().enumerate() { + for file in files { + substrait_files.push(FileOrFiles { + partition_index: partition_index.try_into().unwrap(), + start: 0, + length: file.object_meta.size as u64, + path_type: Some(PathType::UriPath( + file.object_meta.location.as_ref().to_string(), + )), + file_format: Some(FileFormat::Parquet(ParquetReadOptions {})), + }); + } + } + + Ok(Box::new(Rel { + rel_type: Some(RelType::Read(Box::new(ReadRel { + common: None, + base_schema: None, + filter: None, + best_effort_filter: None, + projection: Some(MaskExpression { + select: None, + maintain_singular_struct: false, + }), + advanced_extension: None, + read_type: Some(ReadType::LocalFiles(LocalFiles { + items: substrait_files, + advanced_extension: None, + })), + }))), + })) + } else { + Err(DataFusionError::Substrait(format!( + "Unsupported plan in Substrait physical plan producer: {}", + displayable(plan).one_line() + ))) + } +} diff --git a/datafusion/substrait/src/serializer.rs b/datafusion/substrait/src/serializer.rs index e6844edef3a7..322629c04a27 100644 --- a/datafusion/substrait/src/serializer.rs +++ b/datafusion/substrait/src/serializer.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::producer; +use crate::logical_plan::producer; use datafusion::common::DataFusionError; use datafusion::error::Result; diff --git a/datafusion/substrait/tests/roundtrip.rs b/datafusion/substrait/tests/roundtrip_logical_plan.rs similarity index 99% rename from datafusion/substrait/tests/roundtrip.rs rename to datafusion/substrait/tests/roundtrip_logical_plan.rs index 481e6c83e4a5..c2ab3df3cb40 100644 --- a/datafusion/substrait/tests/roundtrip.rs +++ b/datafusion/substrait/tests/roundtrip_logical_plan.rs @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -use datafusion_substrait::consumer; -use datafusion_substrait::producer; +use datafusion_substrait::logical_plan::{consumer, producer}; #[cfg(test)] mod tests { diff --git a/datafusion/substrait/tests/roundtrip_physical_plan.rs b/datafusion/substrait/tests/roundtrip_physical_plan.rs new file mode 100644 index 000000000000..d213ccf18735 --- /dev/null +++ b/datafusion/substrait/tests/roundtrip_physical_plan.rs @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[cfg(test)] +mod tests { + use datafusion::arrow::datatypes::Schema; + use datafusion::datasource::listing::PartitionedFile; + use datafusion::datasource::object_store::ObjectStoreUrl; + use datafusion::error::Result; + use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec}; + use datafusion::physical_plan::{displayable, ExecutionPlan}; + use datafusion::prelude::SessionContext; + use datafusion_substrait::physical_plan::{consumer, producer}; + use std::collections::HashMap; + use std::sync::Arc; + use substrait::proto::extensions; + + #[tokio::test] + async fn parquet_exec() -> Result<()> { + let scan_config = FileScanConfig { + object_store_url: ObjectStoreUrl::local_filesystem(), + file_schema: Arc::new(Schema::empty()), + file_groups: vec![ + vec![PartitionedFile::new( + "file://foo/part-0.parquet".to_string(), + 123, + )], + vec![PartitionedFile::new( + "file://foo/part-1.parquet".to_string(), + 123, + )], + ], + statistics: Default::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: None, + infinite_source: false, + }; + let parquet_exec: Arc = + Arc::new(ParquetExec::new(scan_config, None, None)); + + let mut extension_info: ( + Vec, + HashMap, + ) = (vec![], HashMap::new()); + + let substrait_rel = + producer::to_substrait_rel(parquet_exec.as_ref(), &mut extension_info)?; + + let mut ctx = SessionContext::new(); + + let parquet_exec_roundtrip = consumer::from_substrait_rel( + &mut ctx, + substrait_rel.as_ref(), + &HashMap::new(), + ) + .await?; + + let expected = format!("{}", displayable(parquet_exec.as_ref()).indent()); + let actual = format!("{}", displayable(parquet_exec_roundtrip.as_ref()).indent()); + assert_eq!(expected, actual); + + Ok(()) + } +} diff --git a/datafusion/substrait/tests/serialize.rs b/datafusion/substrait/tests/serialize.rs index cf70604eb6ca..15328494111e 100644 --- a/datafusion/substrait/tests/serialize.rs +++ b/datafusion/substrait/tests/serialize.rs @@ -17,8 +17,7 @@ #[cfg(test)] mod tests { - - use datafusion_substrait::consumer::from_substrait_plan; + use datafusion_substrait::logical_plan::consumer::from_substrait_plan; use datafusion_substrait::serializer; use datafusion::error::Result;