Skip to content

Conversation

@rgehan
Copy link
Contributor

@rgehan rgehan commented Oct 29, 2025

Which issue does this PR close?

None, but relates to issue #9898

Rationale for this change

N/A

What changes are included in this PR?

This PR adds reproducer tests demonstrating issues with suboptimal optimizations performed on plans that mix pre-sorted parquets and SortExec under an UNION.

Two sets of tests included:

  • Unit tests in datafusion/core/tests/physical_optimizer/enforce_sorting.rs
  • E2E-ish tests in datafusion/core/tests/dataframe/mod.rs, starting from logical plans simulating our use-case

Note

These tests pass with the changes from #9867

Are these changes tested?

N/A

Are there any user-facing changes?

N/A

@github-actions github-actions bot added the core Core DataFusion crate label Oct 29, 2025
"sorted",
&format!("{testdata}/alltypes_tiny_pages.parquet"),
ParquetReadOptions::default()
.file_sort_order(vec![vec![col("id").sort(true, false)]]),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Sidenote: Interestingly, with nulls_first: true (L3074 too), even with the fixes from #9867, the plan includes an extra SortExec node that re-sorts with nulls last. I'm not sure whether that's on purpose, or if there's another issue)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know whether the file is actually sorted or you just add this function to trick the planner to plan this file as it is sorted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file is not actually sorted no, but I was hoping this was a valid way of making the planner think it is, and plan accordingly.

Can this cause issues?

Copy link
Contributor

@NGA-TRAN NGA-TRAN Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this cause issues?

Likely not but I am not % sure if we do anything special with parquet file.

Copy link
Contributor

@NGA-TRAN NGA-TRAN left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reproducer @rgehan . Now I understand what you are trying to do. I have a few comments to make the tests clearer

"sorted",
&format!("{testdata}/alltypes_tiny_pages.parquet"),
ParquetReadOptions::default()
.file_sort_order(vec![vec![col("id").sort(true, false)]]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know whether the file is actually sorted or you just add this function to trick the planner to plan this file as it is sorted?

SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet
DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC], file_type=parquet
");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this test is to let us know we need repartition_sorts = true for it to work. This works as expected: DF understands the 2 input of the Union is sorted and only do the merge after that. It will fail if repartition_sorts = false

async fn reproducer_with_repartition_sorts_false() -> Result<()> {
reproducer_impl(false).await?;

// 💥 Doesn't pass, and generates this plan:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is expected that this test fails here. See

pub repartition_sorts: bool, default = true

| | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] |
| | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet |
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Since the 2 input streams out of the union are sorted, the 2 streams coming out from Partial Aggregate are also sorted. Thus, we should only do the merge

And this only happens with repartition_sorts = true

pub repartition_sorts: bool, default = true

// AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[]
// UnionExec
// DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
// DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since repartition_sorts= false. This test fails as expected. I would modify the test to make it pass instead of not pass. See my comment for reproducer_e2e_impl below

Ok(())
}

async fn reproducer_e2e_impl(repartition_sorts: bool) -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you modify this function to accept 2 parameters: repartition_sorts and expected_plan? Then at the comparison step, you compare with expected_plan. This will help make the tests clearer . You have 4 tests in this PR and only one should fail. The other 3 will pass

Ok(())
}

async fn reproducer_impl(repartition_sorts: bool) -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, can you have this function to accept 2 inputs and have all its 2 tests passed

@rgehan
Copy link
Contributor Author

rgehan commented Oct 30, 2025

@NGA-TRAN thanks a lot for the review and clarifications! I've adapted the PR to hopefully make it clearer what's expected / what's not. I'll also create an issue

@rgehan
Copy link
Contributor Author

rgehan commented Oct 30, 2025

Here's the corresponding feature request: #18380

@NGA-TRAN
Copy link
Contributor

@NGA-TRAN thanks a lot for the review and clarifications! I've adapted the PR to hopefully make it clearer what's expected / what's not. I'll also create an issue

Once you've updated the PR so that three tests pass and one fails, mark the failing test as ignored (with comment) and move it to "ready for review." It's great that the repro is included in the repo.

@rgehan rgehan changed the title Reproducer tests for #9898 Reproducer tests for #18380 Oct 30, 2025
@rgehan rgehan marked this pull request as ready for review October 30, 2025 12:04
@rgehan rgehan requested a review from NGA-TRAN October 30, 2025 12:05
Copy link
Contributor

@NGA-TRAN NGA-TRAN left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great. Just a few minor comment from me.

@alamb This is a good reproducer for a nice optimization request

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rgehan and @NGA-TRAN

Ok(())
}

#[ignore] // See https://github.com/apache/datafusion/issues/18380
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we ignoring the test when the plan is also commented out in the body?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plan is not commented out.
I asked @rgehan to add the relevant part of explain verbose for us see when the issue happens that was the commented out plan

@alamb alamb changed the title Reproducer tests for #18380 Reproducer tests for #18380 (resorting sorted inputs) Oct 30, 2025
@rgehan
Copy link
Contributor Author

rgehan commented Oct 30, 2025

@NGA-TRAN I've applied the requested changes 👍

@rgehan
Copy link
Contributor Author

rgehan commented Nov 1, 2025

I fixed the formatting/clippy issues, but one of the tests is still failing for formatting reasons.

The plan in the snapshot refers to a machine-dependent filepath, so I had tweaked the test logic to replace it with some constant, but failed to realize this would impact the formatting of the snapshot too (since this is a space-padded table).

@rgehan
Copy link
Contributor Author

rgehan commented Nov 1, 2025

I tweaked the tests to use simpler snapshots (no more table format), hopefully this should pass on the CI now 👍

@alamb alamb added this pull request to the merge queue Nov 3, 2025
@alamb
Copy link
Contributor

alamb commented Nov 3, 2025

Thanks again @rgehan and @NGA-TRAN

Merged via the queue into apache:main with commit e4f2b49 Nov 3, 2025
28 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants