Skip to content

Complete substrait support for ParquetExec round trip  #9347

@alamb

Description

@alamb

@devinjdangelo says

I looked into the example and the physical plan substrait producer/consumer code. Unfortunately for physical plans, the subtrait consumer and producer are only implemented for ParquetExec and even then they are not fully implemented, so I do not believe any practical example will execute without further development.

Here is an example which makes it further than the above but panics on the roundtrip assertion:

use datafusion::prelude::*;
use std::collections::HashMap;
use datafusion::error::Result;
use datafusion_substrait::physical_plan;
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()>{
    // Create a plan that scans table 't'
    let ctx = SessionContext::new();
    let testdata = datafusion::test_util::parquet_test_data();
    ctx.register_parquet(
        "alltypes_plain",
        &format!("{testdata}/alltypes_plain.parquet"),
        ParquetReadOptions::default(),
    )
    .await?;
    let df = ctx
        .sql(
            "SELECT * from alltypes_plain",
        )
        .await?;

    let physical_plan = df.create_physical_plan().await?;

    // Convert the plan into a substrait (protobuf) Rel
    let mut extension_info= (vec![], HashMap::new());
    let substrait_plan = physical_plan::producer::to_substrait_rel(physical_plan.as_ref(), &mut extension_info)?;

    // Decode bytes from somewhere (over network, etc.) back to ExecutionPlan
    let physical_round_trip = physical_plan::consumer::from_substrait_rel(
        &ctx, &substrait_plan, &HashMap::new()
    ).await?;
    assert_eq!(format!("{:?}", physical_plan), format!("{:?}", physical_round_trip));
    Ok(())
}

And here is the panic output:

thread 'main' panicked at datafusion/substrait/src/lib.rs:37:2:
assertion `left == right` failed
  left: "ParquetExec { pushdown_filters: None, reorder_filters: None, enable_page_index: None, enable_bloom_filter: None, base_config: object_store_url=ObjectStoreUrl { url: Url { scheme: \"file\", cannot_be_a_base: false, username: \"\", password: None, host: None, port: None, path: \"/\", query: None, fragment: None } }, statistics=Statistics { num_rows: Exact(8), total_byte_size: Exact(671), column_statistics: [ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }] }, file_groups={1 group: [[home/dev/arrow-datafusion/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], projected_statistics: Statistics { num_rows: Exact(8), total_byte_size: Absent, column_statistics: [ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }] }, projected_schema: Schema { fields: [Field { name: \"id\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"bool_col\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"tinyint_col\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"smallint_col\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"int_col\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"bigint_col\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"float_col\", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"double_col\", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"date_string_col\", data_type: Binary, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"string_col\", data_type: Binary, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"timestamp_col\", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, projected_output_ordering: [], metrics: ExecutionPlanMetricsSet { inner: Mutex { data: MetricsSet { metrics: [Metric { value: Count { name: \"num_predicate_creation_errors\", count: Count { value: 0 } }, labels: [], partition: None }] } } }, predicate: None, pruning_predicate: None, page_pruning_predicate: None, metadata_size_hint: None, parquet_file_reader_factory: None }"
 right: "ParquetExec { pushdown_filters: None, reorder_filters: None, enable_page_index: None, enable_bloom_filter: None, base_config: object_store_url=ObjectStoreUrl { url: Url { scheme: \"file\", cannot_be_a_base: false, username: \"\", password: None, host: None, port: None, path: \"/\", query: None, fragment: None } }, statistics=Statistics { num_rows: Absent, total_byte_size: Absent, column_statistics: [] }, file_groups={1 group: [[home/dev/arrow-datafusion/parquet-testing/data/alltypes_plain.parquet]]}, projected_statistics: Statistics { num_rows: Absent, total_byte_size: Absent, column_statistics: [] }, projected_schema: Schema { fields: [], metadata: {} }, projected_output_ordering: [], metrics: ExecutionPlanMetricsSet { inner: Mutex { data: MetricsSet { metrics: [Metric { value: Count { name: \"num_predicate_creation_errors\", count: Count { value: 0 } }, labels: [], partition: None }] } } }, predicate: None, pruning_predicate: None, page_pruning_predicate: None, metadata_size_hint: None, parquet_file_reader_factory: None }"
stack backtrace:
...

You can see that the round trip lost many details about the ParquetExec such as projected_schema and projected_statistics.

I think if we want to include a user facing example of a physical plan substrait roundtrip, we will need to cut a ticket to complete the implementation of ParquetExec to substrait first.

It looks like #5176 built the initial framework for serializing physical plans, but it hasn't been picked up since then.

Originally posted by @devinjdangelo in #9299 (comment)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions