Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 139 additions & 1 deletion datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{plan_err, DataFusionError};
use datafusion_physical_expr::{PhysicalSortExpr, PhysicalSortRequirement};
use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::sorts::partial_sort::PartialSortExec;
use datafusion_physical_plan::unbounded_output;

use itertools::izip;

Expand Down Expand Up @@ -185,7 +187,10 @@ impl PhysicalOptimizerRule for EnforceSorting {
let mut sort_pushdown = SortPushDown::new_default(updated_plan.plan);
assign_initial_requirements(&mut sort_pushdown);
let adjusted = sort_pushdown.transform_down(&pushdown_sorts)?;
Ok(adjusted.plan)

adjusted
.plan
.transform_up(&|plan| Ok(Transformed::Yes(replace_with_partial_sort(plan)?)))
}

fn name(&self) -> &str {
Expand All @@ -197,6 +202,42 @@ impl PhysicalOptimizerRule for EnforceSorting {
}
}

fn replace_with_partial_sort(
plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
let plan_any = plan.as_any();
if let Some(sort_plan) = plan_any.downcast_ref::<SortExec>() {
let child = sort_plan.children()[0].clone();
if !unbounded_output(&child) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment here about why this operator is only used with unbounded output?

I think it is more generally applicable than for just unbounded cases (it would make any plan more streaming as well as require less memory)

We don't have to do it as part of this PR, but I think we should file a follow on ticket to use this operation more generally

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not completely sure whether it will be safe to expand partial sort without incorporating the ExternalSorter used in SortExec. Would love to hear your thoughts on this.

Also SortExec with unbounded input is already pipeline breaking, I wanted to first gate this change behind unbounded input and improve functionality without regressing the current behaviour.

Opened an issue for expanding PartialSort to other use cases here: #9153

return Ok(plan);
}

// here we're trying to find the common prefix for sorted columns that is required for the
// sort and already satisfied by the given ordering
let child_eq_properties = child.equivalence_properties();
let sort_req = PhysicalSortRequirement::from_sort_exprs(sort_plan.expr());

let mut common_prefix_length = 0;
while child_eq_properties
.ordering_satisfy_requirement(&sort_req[0..common_prefix_length + 1])
{
common_prefix_length += 1;
}
if common_prefix_length > 0 {
return Ok(Arc::new(
PartialSortExec::new(
sort_plan.expr().to_vec(),
sort_plan.input().clone(),
common_prefix_length,
)
.with_preserve_partitioning(sort_plan.preserve_partitioning())
.with_fetch(sort_plan.fetch()),
));
}
}
Ok(plan)
}

/// This function turns plans of the form
/// ```text
/// "SortExec: expr=\[a@0 ASC\]",
Expand Down Expand Up @@ -2205,4 +2246,101 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_replace_with_partial_sort() -> Result<()> {
let schema = create_test_schema3()?;
let input_sort_exprs = vec![sort_expr("a", &schema)];
let unbounded_input = stream_exec_ordered(&schema, input_sort_exprs);

let physical_plan = sort_exec(
vec![sort_expr("a", &schema), sort_expr("c", &schema)],
unbounded_input,
);

let expected_input = [
"SortExec: expr=[a@0 ASC,c@2 ASC]",
" StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]"
];
let expected_optimized = [
"PartialSortExec: expr=[a@0 ASC,c@2 ASC], common_prefix_length=[1]",
" StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan, true);
Ok(())
}

#[tokio::test]
async fn test_replace_with_partial_sort2() -> Result<()> {
let schema = create_test_schema3()?;
let input_sort_exprs = vec![sort_expr("a", &schema), sort_expr("c", &schema)];
let unbounded_input = stream_exec_ordered(&schema, input_sort_exprs);

let physical_plan = sort_exec(
vec![
sort_expr("a", &schema),
sort_expr("c", &schema),
sort_expr("d", &schema),
],
unbounded_input,
);

let expected_input = [
"SortExec: expr=[a@0 ASC,c@2 ASC,d@3 ASC]",
" StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC, c@2 ASC]"
];
// let optimized
let expected_optimized = [
"PartialSortExec: expr=[a@0 ASC,c@2 ASC,d@3 ASC], common_prefix_length=[2]",
" StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC, c@2 ASC]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan, true);
Ok(())
}

#[tokio::test]
async fn test_not_replaced_with_partial_sort_for_bounded_input() -> Result<()> {
let schema = create_test_schema3()?;
let input_sort_exprs = vec![sort_expr("b", &schema), sort_expr("c", &schema)];
let parquet_input = parquet_exec_sorted(&schema, input_sort_exprs);

let physical_plan = sort_exec(
vec![
sort_expr("a", &schema),
sort_expr("b", &schema),
sort_expr("c", &schema),
],
parquet_input,
);
let expected_input = [
"SortExec: expr=[a@0 ASC,b@1 ASC,c@2 ASC]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[b@1 ASC, c@2 ASC]"
];
let expected_no_change = expected_input;
assert_optimized!(expected_input, expected_no_change, physical_plan, false);
Ok(())
}

#[tokio::test]
async fn test_not_replaced_with_partial_sort_for_unbounded_input() -> Result<()> {
let schema = create_test_schema3()?;
let input_sort_exprs = vec![sort_expr("b", &schema), sort_expr("c", &schema)];
let unbounded_input = stream_exec_ordered(&schema, input_sort_exprs);

let physical_plan = sort_exec(
vec![
sort_expr("a", &schema),
sort_expr("b", &schema),
sort_expr("c", &schema),
],
unbounded_input,
);
let expected_input = [
"SortExec: expr=[a@0 ASC,b@1 ASC,c@2 ASC]",
" StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[b@1 ASC, c@2 ASC]"
];
let expected_no_change = expected_input;
assert_optimized!(expected_input, expected_no_change, physical_plan, true);
Ok(())
}
}
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/sorts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod builder;
mod cursor;
mod index;
mod merge;
pub mod partial_sort;
pub mod sort;
pub mod sort_preserving_merge;
mod stream;
Expand Down
Loading