-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Enhance/Refactor Ordering Equivalence Properties #7566
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enhance/Refactor Ordering Equivalence Properties #7566
Conversation
# Please enter a commit message to explain why this merge is necessary, # especially if it merges an updated upstream into a topic branch. # # Lines starting with '#' will be ignored, and an empty message aborts # the commit.
# Conflicts: # datafusion/core/src/physical_plan/filter.rs # datafusion/core/src/physical_plan/joins/utils.rs # datafusion/physical-expr/src/equivalence.rs # datafusion/physical-expr/src/lib.rs # datafusion/physical-expr/src/utils.rs # datafusion/sql/src/statement.rs # datafusion/substrait/src/logical_plan/consumer.rs
# Please enter a commit message to explain why this merge is necessary, # especially if it merges an updated upstream into a topic branch. # # Lines starting with '#' will be ignored, and an empty message aborts # the commit.
# Conflicts: # datafusion/core/src/physical_optimizer/enforce_distribution.rs
# Please enter a commit message to explain why this merge is necessary, # especially if it merges an updated upstream into a topic branch. # # Lines starting with '#' will be ignored, and an empty message aborts # the commit.
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much @mustafasrepo
I am approving this PR based on the tests and their improvements, nice work! I also really like the refactoring you have done to the equivalence logic - I think that makes the code easier to understand ❤️
I am concerned with the use of unwrap_or to ignore errors, and I think we should address that (if not in this PR than in a future one -- I can file a ticket to track it)
Also I think there may be additional optimization opportunities in FilterExec, again which could be done as a follow on PR.
FYI @NGA-TRAN
Thanks again
| fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties { | ||
| self.input.ordering_equivalence_properties() | ||
| let stats = self.statistics(); | ||
| // Add the columns that have only one value (singleton) after filtering to constants. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is there a a difference between OrderingEquivalenceProperties and EquivalenceProperties -- it seems like using statistics based equivalence as well as predicate based equivalence would be relevant to both
In other words, if the filter has a predicate like column = 5 shouldn't column be added to list of constants even if the column had more than one value in the statistics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In other words, if the filter has a predicate like column = 5 shouldn't column be added to list of constants even if the column had more than one value in the statistics?
In this case, statistics should be able to determine column will have single value(5) onwards. Hence I presumed, using statistics is sufficient. However, if there are cases, where it is obvious from the predicate value is constant, but statistics fail to resolve it. We can change this implementation I think
| let input_column_stats = match input_stats.column_statistics { | ||
| Some(stats) => stats, | ||
| None => return Statistics::default(), | ||
| None => self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems like it is more general than just for FilterExec -- shouldn't all ExecutionPlan's return statistics that have unbounded columns in the absence of other information? Maybe we should change Statistics::default() to do this 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you. We can remove Statistics::default() implementation, and propagate unbounded columns (in the absence of information) from the source. I will try it.
| let item = PhysicalSortRequirement::from_sort_exprs(item); | ||
| let item = prune_sort_reqs_with_constants(&item, &self.constants); | ||
| let ranges = get_compatible_ranges(&normalized_sort_reqs, &item); | ||
| let mut offset: i64 = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why use an i64 here? It seems like if offset was always a usize the math below would be much simpler as the casting as usize and as i64 could be avoided?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
offset += head.len() as i64 - range as i64; can result in negative number, hence we use i64. However, I agree that casting to i64 and usize a bit weird. If we can avoid it, it would be great. I will try to re-write this logic, to remove this casting.
| ------ProjectionExec: expr=[SUM(t2.t2_int)@1 as SUM(t2.t2_int), t2_id@0 as t2_id] | ||
| --ProjectionExec: expr=[t1_id@2 as t1_id, SUM(t2.t2_int)@0 as SUM(t2.t2_int), t2_id@1 as t2_id] | ||
| ----CoalesceBatchesExec: target_batch_size=8192 | ||
| ------HashJoinExec: mode=Partitioned, join_type=Right, on=[(t2_id@1, t1_id@0)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the difference in this plan that the inputs switched order? Do you know why they did?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since with the changes in this PR, filter doesn't return Statistics::default. It propagates num_rows of AggregateExec above. Then join_selection rule chooses filter side as build (since filter side has less rows than the other side). Previously, since number of rows is not propagated, join_selection rule didn't change sides. Since, we propagate additional information now, planner can choose better build side
# Please enter a commit message to explain why this merge is necessary, # especially if it merges an updated upstream into a topic branch. # # Lines starting with '#' will be ignored, and an empty message aborts # the commit.
|
Thank you @mustafasrepo |
| --CoalesceBatchesExec: target_batch_size=8192 | ||
| ----FilterExec: a@1 = 0 AND b@2 = 0 | ||
| ------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 | ||
| --------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This plan without SortExec is awesome. Thanks so much @mustafasrepo for implementing it and @alamb for reviewing it
Which issue does this PR close?
Closes #7162.
Rationale for this change
Ordering equivalence now considers constants during normalization, with this change we can use information after filter (when result is constant after filtering) to produce better plans.
While trying to close #7162. I realized that, having
OrderingEquivalenceandEquivalenceas different instantiations of same generic is a bit constraining. I took this opportunity to refactorOrderingEquivalenceimplementation (Moving functions to struct methods, and keeping track of constants so that we can use this information during normalization to produce better plans.).What changes are included in this PR?
Are these changes tested?
Yes new tests are added to show that constants are ignored during Sort analysis.
Are there any user-facing changes?