@@ -49,12 +49,11 @@ use crate::physical_plan::{
4949use arrow:: compute:: SortOptions ;
5050use datafusion_common:: tree_node:: { Transformed , TreeNode , VisitRecursion } ;
5151use datafusion_expr:: logical_plan:: JoinType ;
52- use datafusion_physical_expr:: equivalence:: EquivalenceProperties ;
5352use datafusion_physical_expr:: expressions:: { Column , NoOp } ;
54- use datafusion_physical_expr:: utils:: {
55- map_columns_before_projection, ordering_satisfy_requirement_concrete,
53+ use datafusion_physical_expr:: utils:: map_columns_before_projection;
54+ use datafusion_physical_expr:: {
55+ physical_exprs_equal, EquivalenceProperties , PhysicalExpr ,
5656} ;
57- use datafusion_physical_expr:: { expr_list_eq_strict_order, PhysicalExpr } ;
5857use datafusion_physical_plan:: unbounded_output;
5958use datafusion_physical_plan:: windows:: { get_best_fitting_window, BoundedWindowAggExec } ;
6059
@@ -498,7 +497,7 @@ fn reorder_aggregate_keys(
498497
499498 if parent_required. len ( ) != output_exprs. len ( )
500499 || !agg_exec. group_by ( ) . null_expr ( ) . is_empty ( )
501- || expr_list_eq_strict_order ( & output_exprs, parent_required)
500+ || physical_exprs_equal ( & output_exprs, parent_required)
502501 {
503502 Ok ( PlanWithKeyRequirements :: new ( agg_plan) )
504503 } else {
@@ -564,13 +563,11 @@ fn reorder_aggregate_keys(
564563 Arc :: new ( Column :: new (
565564 name,
566565 agg_schema. index_of ( name) . unwrap ( ) ,
567- ) )
568- as Arc < dyn PhysicalExpr > ,
566+ ) ) as _ ,
569567 name. to_owned ( ) ,
570568 )
571569 } )
572570 . collect :: < Vec < _ > > ( ) ;
573- let agg_schema = new_final_agg. schema ( ) ;
574571 let agg_fields = agg_schema. fields ( ) ;
575572 for ( idx, field) in
576573 agg_fields. iter ( ) . enumerate ( ) . skip ( output_columns. len ( ) )
@@ -706,10 +703,9 @@ pub(crate) fn reorder_join_keys_to_inputs(
706703 ) {
707704 if !new_positions. is_empty ( ) {
708705 let new_join_on = new_join_conditions ( & left_keys, & right_keys) ;
709- let mut new_sort_options = vec ! [ ] ;
710- for idx in 0 ..sort_options. len ( ) {
711- new_sort_options. push ( sort_options[ new_positions[ idx] ] )
712- }
706+ let new_sort_options = ( 0 ..sort_options. len ( ) )
707+ . map ( |idx| sort_options[ new_positions[ idx] ] )
708+ . collect ( ) ;
713709 return Ok ( Arc :: new ( SortMergeJoinExec :: try_new (
714710 left. clone ( ) ,
715711 right. clone ( ) ,
@@ -757,39 +753,40 @@ fn try_reorder(
757753 expected : & [ Arc < dyn PhysicalExpr > ] ,
758754 equivalence_properties : & EquivalenceProperties ,
759755) -> Option < ( JoinKeyPairs , Vec < usize > ) > {
756+ let eq_groups = equivalence_properties. eq_group ( ) ;
760757 let mut normalized_expected = vec ! [ ] ;
761758 let mut normalized_left_keys = vec ! [ ] ;
762759 let mut normalized_right_keys = vec ! [ ] ;
763760 if join_keys. left_keys . len ( ) != expected. len ( ) {
764761 return None ;
765762 }
766- if expr_list_eq_strict_order ( expected, & join_keys. left_keys )
767- || expr_list_eq_strict_order ( expected, & join_keys. right_keys )
763+ if physical_exprs_equal ( expected, & join_keys. left_keys )
764+ || physical_exprs_equal ( expected, & join_keys. right_keys )
768765 {
769766 return Some ( ( join_keys, vec ! [ ] ) ) ;
770- } else if !equivalence_properties. classes ( ) . is_empty ( ) {
767+ } else if !equivalence_properties. eq_group ( ) . is_empty ( ) {
771768 normalized_expected = expected
772769 . iter ( )
773- . map ( |e| equivalence_properties . normalize_expr ( e. clone ( ) ) )
770+ . map ( |e| eq_groups . normalize_expr ( e. clone ( ) ) )
774771 . collect :: < Vec < _ > > ( ) ;
775772 assert_eq ! ( normalized_expected. len( ) , expected. len( ) ) ;
776773
777774 normalized_left_keys = join_keys
778775 . left_keys
779776 . iter ( )
780- . map ( |e| equivalence_properties . normalize_expr ( e. clone ( ) ) )
777+ . map ( |e| eq_groups . normalize_expr ( e. clone ( ) ) )
781778 . collect :: < Vec < _ > > ( ) ;
782779 assert_eq ! ( join_keys. left_keys. len( ) , normalized_left_keys. len( ) ) ;
783780
784781 normalized_right_keys = join_keys
785782 . right_keys
786783 . iter ( )
787- . map ( |e| equivalence_properties . normalize_expr ( e. clone ( ) ) )
784+ . map ( |e| eq_groups . normalize_expr ( e. clone ( ) ) )
788785 . collect :: < Vec < _ > > ( ) ;
789786 assert_eq ! ( join_keys. right_keys. len( ) , normalized_right_keys. len( ) ) ;
790787
791- if expr_list_eq_strict_order ( & normalized_expected, & normalized_left_keys)
792- || expr_list_eq_strict_order ( & normalized_expected, & normalized_right_keys)
788+ if physical_exprs_equal ( & normalized_expected, & normalized_left_keys)
789+ || physical_exprs_equal ( & normalized_expected, & normalized_right_keys)
793790 {
794791 return Some ( ( join_keys, vec ! [ ] ) ) ;
795792 }
@@ -870,7 +867,7 @@ fn new_join_conditions(
870867 r_key. as_any ( ) . downcast_ref :: < Column > ( ) . unwrap ( ) . clone ( ) ,
871868 )
872869 } )
873- . collect :: < Vec < _ > > ( )
870+ . collect ( )
874871}
875872
876873/// Updates `dist_onward` such that, to keep track of
@@ -935,9 +932,9 @@ fn add_roundrobin_on_top(
935932 let should_preserve_ordering = input. output_ordering ( ) . is_some ( ) ;
936933
937934 let partitioning = Partitioning :: RoundRobinBatch ( n_target) ;
938- let repartition = RepartitionExec :: try_new ( input, partitioning) ?
939- . with_preserve_order ( should_preserve_ordering) ;
940- let new_plan = Arc :: new ( repartition ) as Arc < dyn ExecutionPlan > ;
935+ let repartition = RepartitionExec :: try_new ( input, partitioning) ?;
936+ let new_plan = Arc :: new ( repartition . with_preserve_order ( should_preserve_ordering) )
937+ as Arc < dyn ExecutionPlan > ;
941938
942939 // update distribution onward with new operator
943940 update_distribution_onward ( new_plan. clone ( ) , dist_onward, input_idx) ;
@@ -1011,9 +1008,9 @@ fn add_hash_on_top(
10111008 input
10121009 } ;
10131010 let partitioning = Partitioning :: Hash ( hash_exprs, n_target) ;
1014- let repartition = RepartitionExec :: try_new ( new_plan, partitioning) ?
1015- . with_preserve_order ( should_preserve_ordering ) ;
1016- new_plan = Arc :: new ( repartition) as _ ;
1011+ let repartition = RepartitionExec :: try_new ( new_plan, partitioning) ?;
1012+ new_plan =
1013+ Arc :: new ( repartition. with_preserve_order ( should_preserve_ordering ) ) as _ ;
10171014
10181015 // update distribution onward with new operator
10191016 update_distribution_onward ( new_plan. clone ( ) , dist_onward, input_idx) ;
@@ -1302,16 +1299,12 @@ fn ensure_distribution(
13021299
13031300 // There is an ordering requirement of the operator:
13041301 if let Some ( required_input_ordering) = required_input_ordering {
1305- let existing_ordering = child. output_ordering ( ) . unwrap_or ( & [ ] ) ;
13061302 // Either:
13071303 // - Ordering requirement cannot be satisfied by preserving ordering through repartitions, or
13081304 // - using order preserving variant is not desirable.
1309- let ordering_satisfied = ordering_satisfy_requirement_concrete (
1310- existing_ordering,
1311- required_input_ordering,
1312- || child. equivalence_properties ( ) ,
1313- || child. ordering_equivalence_properties ( ) ,
1314- ) ;
1305+ let ordering_satisfied = child
1306+ . equivalence_properties ( )
1307+ . ordering_satisfy_requirement ( required_input_ordering) ;
13151308 if !ordering_satisfied || !order_preserving_variants_desirable {
13161309 replace_order_preserving_variants ( & mut child, dist_onward) ?;
13171310 // If ordering requirements were satisfied before repartitioning,
@@ -3763,14 +3756,14 @@ mod tests {
37633756 fn repartition_transitively_past_sort_with_filter ( ) -> Result < ( ) > {
37643757 let schema = schema ( ) ;
37653758 let sort_key = vec ! [ PhysicalSortExpr {
3766- expr: col( "c " , & schema) . unwrap( ) ,
3759+ expr: col( "a " , & schema) . unwrap( ) ,
37673760 options: SortOptions :: default ( ) ,
37683761 } ] ;
37693762 let plan = sort_exec ( sort_key, filter_exec ( parquet_exec ( ) ) , false ) ;
37703763
37713764 let expected = & [
3772- "SortPreservingMergeExec: [c@2 ASC]" ,
3773- "SortExec: expr=[c@2 ASC]" ,
3765+ "SortPreservingMergeExec: [a@0 ASC]" ,
3766+ "SortExec: expr=[a@0 ASC]" ,
37743767 // Expect repartition on the input to the sort (as it can benefit from additional parallelism)
37753768 "FilterExec: c@2 = 0" ,
37763769 "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1" ,
@@ -3780,7 +3773,7 @@ mod tests {
37803773 assert_optimized ! ( expected, plan. clone( ) , true ) ;
37813774
37823775 let expected_first_sort_enforcement = & [
3783- "SortExec: expr=[c@2 ASC]" ,
3776+ "SortExec: expr=[a@0 ASC]" ,
37843777 "CoalescePartitionsExec" ,
37853778 "FilterExec: c@2 = 0" ,
37863779 // Expect repartition on the input of the filter (as it can benefit from additional parallelism)
@@ -4357,29 +4350,54 @@ mod tests {
43574350 fn do_not_preserve_ordering_through_repartition ( ) -> Result < ( ) > {
43584351 let schema = schema ( ) ;
43594352 let sort_key = vec ! [ PhysicalSortExpr {
4360- expr: col( "c " , & schema) . unwrap( ) ,
4353+ expr: col( "a " , & schema) . unwrap( ) ,
43614354 options: SortOptions :: default ( ) ,
43624355 } ] ;
43634356 let input = parquet_exec_multiple_sorted ( vec ! [ sort_key. clone( ) ] ) ;
43644357 let physical_plan = sort_preserving_merge_exec ( sort_key, filter_exec ( input) ) ;
43654358
43664359 let expected = & [
4367- "SortPreservingMergeExec: [c@2 ASC]" ,
4368- "SortExec: expr=[c@2 ASC]" ,
4360+ "SortPreservingMergeExec: [a@0 ASC]" ,
4361+ "SortExec: expr=[a@0 ASC]" ,
43694362 "FilterExec: c@2 = 0" ,
43704363 "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2" ,
4371- "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]" ,
4364+ "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]" ,
43724365 ] ;
43734366
43744367 assert_optimized ! ( expected, physical_plan. clone( ) , true ) ;
43754368
43764369 let expected = & [
4377- "SortExec: expr=[c@2 ASC]" ,
4370+ "SortExec: expr=[a@0 ASC]" ,
4371+ "CoalescePartitionsExec" ,
4372+ "FilterExec: c@2 = 0" ,
4373+ "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2" ,
4374+ "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]" ,
4375+ ] ;
4376+ assert_optimized ! ( expected, physical_plan, false ) ;
4377+
4378+ Ok ( ( ) )
4379+ }
4380+
4381+ #[ test]
4382+ fn no_need_for_sort_after_filter ( ) -> Result < ( ) > {
4383+ let schema = schema ( ) ;
4384+ let sort_key = vec ! [ PhysicalSortExpr {
4385+ expr: col( "c" , & schema) . unwrap( ) ,
4386+ options: SortOptions :: default ( ) ,
4387+ } ] ;
4388+ let input = parquet_exec_multiple_sorted ( vec ! [ sort_key. clone( ) ] ) ;
4389+ let physical_plan = sort_preserving_merge_exec ( sort_key, filter_exec ( input) ) ;
4390+
4391+ let expected = & [
4392+ // After CoalescePartitionsExec c is still constant. Hence c@2 ASC ordering is already satisfied.
43784393 "CoalescePartitionsExec" ,
4394+ // Since after this stage c is constant. c@2 ASC ordering is already satisfied.
43794395 "FilterExec: c@2 = 0" ,
43804396 "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2" ,
43814397 "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]" ,
43824398 ] ;
4399+
4400+ assert_optimized ! ( expected, physical_plan. clone( ) , true ) ;
43834401 assert_optimized ! ( expected, physical_plan, false ) ;
43844402
43854403 Ok ( ( ) )
0 commit comments