Skip to content

Commit de3deb3

Browse files
authored
Add initial support for serializing physical plans with Substrait (#5176)
1 parent d5077b5 commit de3deb3

File tree

12 files changed

+350
-7
lines changed

12 files changed

+350
-7
lines changed

datafusion/substrait/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@ rust-version = "1.62"
2525

2626
[dependencies]
2727
async-recursion = "1.0"
28+
chrono = "0.4.23"
2829
datafusion = { version = "17.0.0", path = "../core" }
2930
itertools = "0.10.5"
31+
object_store = "0.5.3"
3032
prost = "0.11"
3133
prost-types = "0.11"
3234
substrait = "0.4"

datafusion/substrait/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
pub mod consumer;
19-
pub mod producer;
18+
pub mod logical_plan;
19+
pub mod physical_plan;
2020
pub mod serializer;
2121

2222
// Re-export substrait crate
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
pub mod consumer;
19+
pub mod producer;
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use async_recursion::async_recursion;
19+
use chrono::DateTime;
20+
use datafusion::arrow::datatypes::Schema;
21+
use datafusion::datasource::listing::PartitionedFile;
22+
use datafusion::datasource::object_store::ObjectStoreUrl;
23+
use datafusion::error::{DataFusionError, Result};
24+
use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
25+
use datafusion::physical_plan::ExecutionPlan;
26+
use datafusion::prelude::SessionContext;
27+
use object_store::ObjectMeta;
28+
use std::collections::HashMap;
29+
use std::sync::Arc;
30+
use substrait::proto::read_rel::local_files::file_or_files::PathType;
31+
use substrait::proto::{
32+
expression::MaskExpression, read_rel::ReadType, rel::RelType, Rel,
33+
};
34+
35+
/// Convert Substrait Rel to DataFusion ExecutionPlan
36+
#[async_recursion]
37+
pub async fn from_substrait_rel(
38+
_ctx: &mut SessionContext,
39+
rel: &Rel,
40+
_extensions: &HashMap<u32, &String>,
41+
) -> Result<Arc<dyn ExecutionPlan>> {
42+
match &rel.rel_type {
43+
Some(RelType::Read(read)) => {
44+
if read.filter.is_some() || read.best_effort_filter.is_some() {
45+
return Err(DataFusionError::NotImplemented(
46+
"Read with filter is not supported".to_string(),
47+
));
48+
}
49+
if read.base_schema.is_some() {
50+
return Err(DataFusionError::NotImplemented(
51+
"Read with schema is not supported".to_string(),
52+
));
53+
}
54+
if read.advanced_extension.is_some() {
55+
return Err(DataFusionError::NotImplemented(
56+
"Read with AdvancedExtension is not supported".to_string(),
57+
));
58+
}
59+
match &read.as_ref().read_type {
60+
Some(ReadType::LocalFiles(files)) => {
61+
let mut file_groups = vec![];
62+
63+
for file in &files.items {
64+
let path = if let Some(path_type) = &file.path_type {
65+
match path_type {
66+
PathType::UriPath(path) => Ok(path.clone()),
67+
PathType::UriPathGlob(path) => Ok(path.clone()),
68+
PathType::UriFile(path) => Ok(path.clone()),
69+
PathType::UriFolder(path) => Ok(path.clone()),
70+
}
71+
} else {
72+
Err(DataFusionError::Substrait(
73+
"Missing PathType".to_string(),
74+
))
75+
}?;
76+
77+
// TODO substrait plans do not have `last_modified` or `size` but `ObjectMeta`
78+
// requires them both - perhaps we can change the object-store crate
79+
// to make these optional? We cannot guarantee that we have access to the
80+
// files to get this information, depending on how this library is being
81+
// used
82+
let last_modified = DateTime::parse_from_str(
83+
"1970 Jan 1 00:00:00.000 +0000",
84+
"%Y %b %d %H:%M:%S%.3f %z",
85+
)
86+
.unwrap();
87+
let size = 0;
88+
89+
let partitioned_file = PartitionedFile {
90+
object_meta: ObjectMeta {
91+
last_modified: last_modified.into(),
92+
location: path.into(),
93+
size,
94+
},
95+
partition_values: vec![],
96+
range: None,
97+
extensions: None,
98+
};
99+
100+
let part_index = file.partition_index as usize;
101+
while part_index >= file_groups.len() {
102+
file_groups.push(vec![]);
103+
}
104+
file_groups[part_index].push(partitioned_file)
105+
}
106+
107+
let mut base_config = FileScanConfig {
108+
object_store_url: ObjectStoreUrl::local_filesystem(),
109+
file_schema: Arc::new(Schema::empty()),
110+
file_groups,
111+
statistics: Default::default(),
112+
projection: None,
113+
limit: None,
114+
table_partition_cols: vec![],
115+
output_ordering: None,
116+
infinite_source: false,
117+
};
118+
119+
if let Some(MaskExpression { select, .. }) = &read.projection {
120+
if let Some(projection) = &select.as_ref() {
121+
let column_indices: Vec<usize> = projection
122+
.struct_items
123+
.iter()
124+
.map(|item| item.field as usize)
125+
.collect();
126+
base_config.projection = Some(column_indices);
127+
}
128+
}
129+
130+
Ok(Arc::new(ParquetExec::new(base_config, None, None))
131+
as Arc<dyn ExecutionPlan>)
132+
}
133+
_ => Err(DataFusionError::NotImplemented(
134+
"Only LocalFile reads are supported when parsing physical"
135+
.to_string(),
136+
)),
137+
}
138+
}
139+
_ => Err(DataFusionError::NotImplemented(format!(
140+
"Unsupported RelType: {:?}",
141+
rel.rel_type
142+
))),
143+
}
144+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
pub mod consumer;
19+
pub mod producer;
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use datafusion::error::{DataFusionError, Result};
19+
use datafusion::physical_plan::file_format::ParquetExec;
20+
use datafusion::physical_plan::{displayable, ExecutionPlan};
21+
use std::collections::HashMap;
22+
use substrait::proto::expression::MaskExpression;
23+
use substrait::proto::extensions;
24+
use substrait::proto::read_rel::local_files::file_or_files::ParquetReadOptions;
25+
use substrait::proto::read_rel::local_files::file_or_files::{FileFormat, PathType};
26+
use substrait::proto::read_rel::local_files::FileOrFiles;
27+
use substrait::proto::read_rel::LocalFiles;
28+
use substrait::proto::read_rel::ReadType;
29+
use substrait::proto::rel::RelType;
30+
use substrait::proto::ReadRel;
31+
use substrait::proto::Rel;
32+
33+
/// Convert DataFusion ExecutionPlan to Substrait Rel
34+
pub fn to_substrait_rel(
35+
plan: &dyn ExecutionPlan,
36+
_extension_info: &mut (
37+
Vec<extensions::SimpleExtensionDeclaration>,
38+
HashMap<String, u32>,
39+
),
40+
) -> Result<Box<Rel>> {
41+
if let Some(scan) = plan.as_any().downcast_ref::<ParquetExec>() {
42+
let base_config = scan.base_config();
43+
let mut substrait_files = vec![];
44+
for (partition_index, files) in base_config.file_groups.iter().enumerate() {
45+
for file in files {
46+
substrait_files.push(FileOrFiles {
47+
partition_index: partition_index.try_into().unwrap(),
48+
start: 0,
49+
length: file.object_meta.size as u64,
50+
path_type: Some(PathType::UriPath(
51+
file.object_meta.location.as_ref().to_string(),
52+
)),
53+
file_format: Some(FileFormat::Parquet(ParquetReadOptions {})),
54+
});
55+
}
56+
}
57+
58+
Ok(Box::new(Rel {
59+
rel_type: Some(RelType::Read(Box::new(ReadRel {
60+
common: None,
61+
base_schema: None,
62+
filter: None,
63+
best_effort_filter: None,
64+
projection: Some(MaskExpression {
65+
select: None,
66+
maintain_singular_struct: false,
67+
}),
68+
advanced_extension: None,
69+
read_type: Some(ReadType::LocalFiles(LocalFiles {
70+
items: substrait_files,
71+
advanced_extension: None,
72+
})),
73+
}))),
74+
}))
75+
} else {
76+
Err(DataFusionError::Substrait(format!(
77+
"Unsupported plan in Substrait physical plan producer: {}",
78+
displayable(plan).one_line()
79+
)))
80+
}
81+
}

datafusion/substrait/src/serializer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::producer;
18+
use crate::logical_plan::producer;
1919

2020
use datafusion::common::DataFusionError;
2121
use datafusion::error::Result;

datafusion/substrait/tests/roundtrip.rs renamed to datafusion/substrait/tests/roundtrip_logical_plan.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use datafusion_substrait::consumer;
19-
use datafusion_substrait::producer;
18+
use datafusion_substrait::logical_plan::{consumer, producer};
2019

2120
#[cfg(test)]
2221
mod tests {

0 commit comments

Comments
 (0)