Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions datafusion/substrait/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ rust-version = "1.62"
[dependencies]
async-recursion = "1.0"
datafusion = { version = "17.0.0", path = "../core" }
chrono = "0.4.23"
itertools = "0.10.5"
object_store = "0.5.3"
prost = "0.11"
prost-types = "0.11"
substrait = "0.4"
Expand Down
4 changes: 2 additions & 2 deletions datafusion/substrait/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
// specific language governing permissions and limitations
// under the License.

pub mod consumer;
pub mod producer;
pub mod logical_plan;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like these name changes. Make it much more clear what the library is trying to offer.

pub mod physical_plan;
pub mod serializer;

// Re-export substrait crate
Expand Down
19 changes: 19 additions & 0 deletions datafusion/substrait/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

pub mod consumer;
pub mod producer;
144 changes: 144 additions & 0 deletions datafusion/substrait/src/physical_plan/consumer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use async_recursion::async_recursion;
use chrono::DateTime;
use datafusion::arrow::datatypes::Schema;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::error::{DataFusionError, Result};
use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use object_store::ObjectMeta;
use std::collections::HashMap;
use std::sync::Arc;
use substrait::proto::read_rel::local_files::file_or_files::PathType;
use substrait::proto::{
expression::MaskExpression, read_rel::ReadType, rel::RelType, Rel,
};

/// Convert Substrait Rel to DataFusion ExecutionPlan
#[async_recursion]
pub async fn from_substrait_rel(
_ctx: &mut SessionContext,
rel: &Rel,
_extensions: &HashMap<u32, &String>,
) -> Result<Arc<dyn ExecutionPlan>> {
match &rel.rel_type {
Some(RelType::Read(read)) => {
if read.filter.is_some() || read.best_effort_filter.is_some() {
return Err(DataFusionError::NotImplemented(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this just because this is a first pass at getting the framework together or is it truly not possible with DataFusion today?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, just a first pass. I plan on following up with smaller PRs to make this usable.

"Read with filter is not supported".to_string(),
));
}
if read.base_schema.is_some() {
return Err(DataFusionError::NotImplemented(
"Read with schema is not supported".to_string(),
));
}
if read.advanced_extension.is_some() {
return Err(DataFusionError::NotImplemented(
"Read with AdvancedExtension is not supported".to_string(),
));
}
match &read.as_ref().read_type {
Some(ReadType::LocalFiles(files)) => {
let mut file_groups = vec![];

for file in &files.items {
let path = if let Some(path_type) = &file.path_type {
match path_type {
PathType::UriPath(path) => Ok(path.clone()),
PathType::UriPathGlob(path) => Ok(path.clone()),
PathType::UriFile(path) => Ok(path.clone()),
PathType::UriFolder(path) => Ok(path.clone()),
}
} else {
Err(DataFusionError::Substrait(
"Missing PathType".to_string(),
))
}?;

// TODO substrait plans do not have `last_modified` or `size` but `ObjectMeta`
// requires them both - perhaps we can change the object-store crate
// to make these optional? We cannot guarantee that we have access to the
// files to get this information, depending on how this library is being
// used
let last_modified = DateTime::parse_from_str(
"1970 Jan 1 00:00:00.000 +0000",
"%Y %b %d %H:%M:%S%.3f %z",
)
.unwrap();
let size = 0;

let partitioned_file = PartitionedFile {
object_meta: ObjectMeta {
last_modified: last_modified.into(),
location: path.into(),
size,
},
partition_values: vec![],
range: None,
extensions: None,
};

let part_index = file.partition_index as usize;
while part_index >= file_groups.len() {
file_groups.push(vec![]);
}
file_groups[part_index].push(partitioned_file)
}

let mut base_config = FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_schema: Arc::new(Schema::empty()),
file_groups,
statistics: Default::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: None,
infinite_source: false,
};

if let Some(MaskExpression { select, .. }) = &read.projection {
if let Some(projection) = &select.as_ref() {
let column_indices: Vec<usize> = projection
.struct_items
.iter()
.map(|item| item.field as usize)
.collect();
base_config.projection = Some(column_indices);
}
}

Ok(Arc::new(ParquetExec::new(base_config, None, None))
as Arc<dyn ExecutionPlan>)
}
_ => Err(DataFusionError::NotImplemented(
"Only LocalFile reads are supported when parsing physical"
.to_string(),
)),
}
}
_ => Err(DataFusionError::NotImplemented(format!(
"Unsupported RelType: {:?}",
rel.rel_type
))),
}
}
19 changes: 19 additions & 0 deletions datafusion/substrait/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

pub mod consumer;
pub mod producer;
81 changes: 81 additions & 0 deletions datafusion/substrait/src/physical_plan/producer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion::error::{DataFusionError, Result};
use datafusion::physical_plan::file_format::ParquetExec;
use datafusion::physical_plan::{displayable, ExecutionPlan};
use std::collections::HashMap;
use substrait::proto::expression::MaskExpression;
use substrait::proto::extensions;
use substrait::proto::read_rel::local_files::file_or_files::ParquetReadOptions;
use substrait::proto::read_rel::local_files::file_or_files::{FileFormat, PathType};
use substrait::proto::read_rel::local_files::FileOrFiles;
use substrait::proto::read_rel::LocalFiles;
use substrait::proto::read_rel::ReadType;
use substrait::proto::rel::RelType;
use substrait::proto::ReadRel;
use substrait::proto::Rel;

/// Convert DataFusion ExecutionPlan to Substrait Rel
pub fn to_substrait_rel(
plan: &dyn ExecutionPlan,
_extension_info: &mut (
Vec<extensions::SimpleExtensionDeclaration>,
HashMap<String, u32>,
),
) -> Result<Box<Rel>> {
if let Some(scan) = plan.as_any().downcast_ref::<ParquetExec>() {
let base_config = scan.base_config();
let mut substrait_files = vec![];
for (partition_index, files) in base_config.file_groups.iter().enumerate() {
for file in files {
substrait_files.push(FileOrFiles {
partition_index: partition_index.try_into().unwrap(),
start: 0,
length: file.object_meta.size as u64,
path_type: Some(PathType::UriPath(
file.object_meta.location.as_ref().to_string(),
)),
file_format: Some(FileFormat::Parquet(ParquetReadOptions {})),
});
}
}

Ok(Box::new(Rel {
rel_type: Some(RelType::Read(Box::new(ReadRel {
common: None,
base_schema: None,
filter: None,
best_effort_filter: None,
projection: Some(MaskExpression {
select: None,
maintain_singular_struct: false,
}),
advanced_extension: None,
read_type: Some(ReadType::LocalFiles(LocalFiles {
items: substrait_files,
advanced_extension: None,
})),
}))),
}))
} else {
Err(DataFusionError::Substrait(format!(
"Unsupported plan in Substrait physical plan producer: {}",
displayable(plan).one_line()
)))
}
}
2 changes: 1 addition & 1 deletion datafusion/substrait/src/serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::producer;
use crate::logical_plan::producer;

use datafusion::common::DataFusionError;
use datafusion::error::Result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use datafusion_substrait::consumer;
use datafusion_substrait::producer;
use datafusion_substrait::logical_plan::{consumer, producer};

#[cfg(test)]
mod tests {
Expand Down
80 changes: 80 additions & 0 deletions datafusion/substrait/tests/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#[cfg(test)]
mod tests {
use datafusion::arrow::datatypes::Schema;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::error::Result;
use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
use datafusion::physical_plan::{displayable, ExecutionPlan};
use datafusion::prelude::SessionContext;
use datafusion_substrait::physical_plan::{consumer, producer};
use std::collections::HashMap;
use std::sync::Arc;
use substrait::proto::extensions;

#[tokio::test]
async fn parquet_exec() -> Result<()> {
let scan_config = FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_schema: Arc::new(Schema::empty()),
file_groups: vec![
vec![PartitionedFile::new(
"file://foo/part-0.parquet".to_string(),
123,
)],
vec![PartitionedFile::new(
"file://foo/part-1.parquet".to_string(),
123,
)],
],
statistics: Default::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: None,
infinite_source: false,
};
let parquet_exec: Arc<dyn ExecutionPlan> =
Arc::new(ParquetExec::new(scan_config, None, None));

let mut extension_info: (
Vec<extensions::SimpleExtensionDeclaration>,
HashMap<String, u32>,
) = (vec![], HashMap::new());

let substrait_rel =
producer::to_substrait_rel(parquet_exec.as_ref(), &mut extension_info)?;

let mut ctx = SessionContext::new();

let parquet_exec_roundtrip = consumer::from_substrait_rel(
&mut ctx,
substrait_rel.as_ref(),
&HashMap::new(),
)
.await?;

let expected = format!("{}", displayable(parquet_exec.as_ref()).indent());
let actual = format!("{}", displayable(parquet_exec_roundtrip.as_ref()).indent());
assert_eq!(expected, actual);

Ok(())
}
}
3 changes: 1 addition & 2 deletions datafusion/substrait/tests/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

#[cfg(test)]
mod tests {

use datafusion_substrait::consumer::from_substrait_plan;
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
use datafusion_substrait::serializer;

use datafusion::error::Result;
Expand Down