Skip to content
147 changes: 147 additions & 0 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use insta::assert_snapshot;
use object_store::local::LocalFileSystem;
use std::collections::HashMap;
use std::fs;
use std::path::Path;
use std::sync::Arc;
use tempfile::TempDir;
use url::Url;
Expand Down Expand Up @@ -2996,6 +2997,152 @@ async fn test_count_wildcard_on_window() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_false() -> Result<()> {
assert_snapshot!(
union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(false).await?,
@r#"
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Aggregate: groupBy=[[id]], aggr=[[]] |
| | Union |
| | TableScan: sorted projection=[id] |
| | Sort: unsorted.id ASC NULLS LAST |
| | TableScan: unsorted projection=[id] |
| physical_plan | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted |
| | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] |
| | CoalescePartitionsExec |
| | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[] |
| | UnionExec |
| | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet |
| | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet |
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
"#);
Ok(())
}

#[ignore] // See https://github.com/apache/datafusion/issues/18380
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we ignoring the test when the plan is also commented out in the body?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plan is not commented out.
I asked @rgehan to add the relevant part of explain verbose for us see when the issue happens that was the commented out plan

#[tokio::test]
async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_true() -> Result<()> {
assert_snapshot!(
union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(true).await?,
@r#"
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Aggregate: groupBy=[[id]], aggr=[[]] |
| | Union |
| | TableScan: sorted projection=[id] |
| | Sort: unsorted.id ASC NULLS LAST |
| | TableScan: unsorted projection=[id] |
| physical_plan | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted |
| | SortPreservingMergeExec: [id@0 ASC NULLS LAST] |
| | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted |
| | UnionExec |
| | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet |
| | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] |
| | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet |
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
"#);

// 💥 Doesn't pass, and generates this plan:
//
// AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted
// SortPreservingMergeExec: [id@0 ASC NULLS LAST]
// SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true]
// AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[]
// UnionExec
// DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
// DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
//
//
// === Excerpt from the verbose explain ===
//
// Physical_plan after EnforceDistribution:
//
// OutputRequirementExec: order_by=[], dist_by=Unspecified
// AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted
// SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
// CoalescePartitionsExec
// AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted
// UnionExec
// DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
// SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
// DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
//
// Physical_plan after EnforceSorting:
//
// OutputRequirementExec: order_by=[], dist_by=Unspecified
// AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted
// SortPreservingMergeExec: [id@0 ASC NULLS LAST]
// SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true]
// AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[]
// UnionExec
// DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
// DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet

Ok(())
}

async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(repartition_sorts: bool) -> Result<String> {
let config = SessionConfig::default()
.with_target_partitions(1)
.with_repartition_sorts(repartition_sorts);
let ctx = SessionContext::new_with_config(config);

let testdata = parquet_test_data();

// Register "sorted" table, that is sorted
ctx.register_parquet(
"sorted",
&format!("{testdata}/alltypes_tiny_pages.parquet"),
ParquetReadOptions::default()
.file_sort_order(vec![vec![col("id").sort(true, false)]]),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Sidenote: Interestingly, with nulls_first: true (L3074 too), even with the fixes from #9867, the plan includes an extra SortExec node that re-sorts with nulls last. I'm not sure whether that's on purpose, or if there's another issue)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know whether the file is actually sorted or you just add this function to trick the planner to plan this file as it is sorted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file is not actually sorted no, but I was hoping this was a valid way of making the planner think it is, and plan accordingly.

Can this cause issues?

Copy link
Contributor

@NGA-TRAN NGA-TRAN Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this cause issues?

Likely not but I am not % sure if we do anything special with parquet file.

)
.await?;

// Register "unsorted" table
ctx.register_parquet(
"unsorted",
&format!("{testdata}/alltypes_tiny_pages.parquet"),
ParquetReadOptions::default()
)
.await?;

let source_sorted = ctx
.table("sorted")
.await
.unwrap()
.select(vec![col("id")])
.unwrap();

let source_unsorted = ctx
.table("unsorted")
.await
.unwrap()
.select(vec![col("id")])
.unwrap();

let source_unsorted_resorted = source_unsorted
.sort(vec![col("id").sort(true, false)])?;

let union = source_sorted.union(source_unsorted_resorted)?;

let agg = union.aggregate(vec![col("id")], vec![])?;

let df = agg;

// To be able to remove user specific paths from the plan, for stable assertions
let testdata_clean = Path::new(&testdata).canonicalize()?.display().to_string();
let testdata_clean = testdata_clean.strip_prefix("/").unwrap_or(&testdata_clean);

let plan = df.explain(false, false)?.collect().await?;
Ok(pretty_format_batches(&plan)?.to_string().replace(&testdata_clean, "{testdata}"))
}

#[tokio::test]
async fn test_count_wildcard_on_aggregate() -> Result<()> {
let ctx = create_join_context()?;
Expand Down
81 changes: 81 additions & 0 deletions datafusion/core/tests/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,87 @@ async fn test_union_inputs_different_sorted2() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_true() -> Result<()> {
assert_snapshot!(
union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(true).await?,
@r"
Input Plan:
OutputRequirementExec: order_by=[(nullable_col@0, asc)], dist_by=SinglePartition
CoalescePartitionsExec
UnionExec
SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet
DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC], file_type=parquet

Optimized Plan:
OutputRequirementExec: order_by=[(nullable_col@0, asc)], dist_by=SinglePartition
SortPreservingMergeExec: [nullable_col@0 ASC]
UnionExec
SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet
DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC], file_type=parquet
");
Ok(())
}

#[tokio::test]
async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_false() -> Result<()> {
assert_snapshot!(
union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(false).await?,
@r"
Input Plan:
OutputRequirementExec: order_by=[(nullable_col@0, asc)], dist_by=SinglePartition
CoalescePartitionsExec
UnionExec
SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet
DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC], file_type=parquet

Optimized Plan:
OutputRequirementExec: order_by=[(nullable_col@0, asc)], dist_by=SinglePartition
SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]
CoalescePartitionsExec
UnionExec
DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet
DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC], file_type=parquet
");
Ok(())
}

async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(repartition_sorts: bool) -> Result<String> {
let schema = create_test_schema()?;

// Source 1, will be sorted explicitly (on `nullable_col`)
let source1 = parquet_exec(schema.clone());
let ordering1 = [sort_expr("nullable_col", &schema)].into();
let sort1 = sort_exec(ordering1, source1.clone());

// Source 2, pre-sorted (on `nullable_col`)
let parquet_ordering: LexOrdering = [sort_expr("nullable_col", &schema)].into();
let source2 = parquet_exec_with_sort(schema.clone(), vec![parquet_ordering.clone()]);

let union = union_exec(vec![sort1, source2]);

let coalesced = coalesce_partitions_exec(union);

// Required sorted / single partitioned output
let requirement = [PhysicalSortRequirement::new(
col("nullable_col", &schema)?,
Some(SortOptions::new(false, true)),
)]
.into();
let physical_plan = Arc::new(OutputRequirementExec::new(
coalesced,
Some(OrderingRequirements::new(requirement)),
Distribution::SinglePartition,
None,
));

let test = EnforceSortingTest::new(physical_plan).with_repartition_sorts(repartition_sorts);
Ok(test.run())
}

#[tokio::test]
async fn test_union_inputs_different_sorted3() -> Result<()> {
let schema = create_test_schema()?;
Expand Down