Skip to content

Conversation

@DhamoPS
Copy link
Contributor

@DhamoPS DhamoPS commented Sep 1, 2022

Which issue does this PR close?

Closes #78 .

Rationale for this change

Join Predicates which are part of OR expressions of FILTER PREDICATES, were not pulled properly. Due to that, CROSS JOINs are chosen for those joins instead of innerjoin and causes query to run longer. Actually TPCH Q19 was taking more than 2000 sec in my laptop. With Fix, TPCH Q19 is taking ~3 sec.

What changes are included in this PR?

-- planner.rs::predicates which are part of OR expr also pulled to
Joined predicates if left and right child of OR expr are having the same
predicates.
-- tests/sql/subqueries.rs::added testcases for the above fix

Are there any user-facing changes?

No

-- planner.rs::predicates which are part of OR expr also pulled to
Joined predicates if left and right child of OR expr are having the same
predicates.
-- tests/sql/subqueries.rs::added testcases for the above fix
@github-actions github-actions bot added core Core DataFusion crate sql SQL Planner labels Sep 1, 2022
@avantgardnerio
Copy link
Contributor

@andygrove or @alamb can you kick off CI please?

@andygrove andygrove changed the title Dhamo 78 Address performance/execution plan of TPCH query 19 Sep 1, 2022
let expected = r#"Projection: #SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS revenue
Aggregate: groupBy=[[]], aggr=[[SUM(#lineitem.l_extendedprice * Int64(1) - #lineitem.l_discount)]]
Projection: #lineitem.l_shipmode IN ([Utf8("AIR"), Utf8("AIR REG")]) AS InList-falseLiteralAIR REGLiteralAIRColumn-lineitem.l_shipmode, #lineitem.l_shipinstruct = Utf8("DELIVER IN PERSON") AS BinaryExpr-=LiteralDELIVER IN PERSONColumn-lineitem.l_shipinstruct, #lineitem.l_quantity, #lineitem.l_extendedprice, #lineitem.l_discount, #part.p_brand, #part.p_size, #part.p_container
Filter: #part.p_brand = Utf8("Brand#12") AND #part.p_container IN ([Utf8("SM CASE"), Utf8("SM BOX"), Utf8("SM PACK"), Utf8("SM PKG")]) AND #lineitem.l_quantity >= Int64(1) AND #lineitem.l_quantity <= Int64(11) AND #part.p_size BETWEEN Int64(1) AND Int64(5) OR #part.p_brand = Utf8("Brand#23") AND #part.p_container IN ([Utf8("MED BAG"), Utf8("MED BOX"), Utf8("MED PKG"), Utf8("MED PACK")]) AND #lineitem.l_quantity >= Int64(10) AND #lineitem.l_quantity <= Int64(20) AND #part.p_size BETWEEN Int64(1) AND Int64(10) OR #part.p_brand = Utf8("Brand#34") AND #part.p_container IN ([Utf8("LG CASE"), Utf8("LG BOX"), Utf8("LG PACK"), Utf8("LG PKG")]) AND #lineitem.l_quantity >= Int64(20) AND #lineitem.l_quantity <= Int64(30) AND #part.p_size BETWEEN Int64(1) AND Int64(15)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is concerning to me, but maybe it is a bug in how Display works for filters:

#part.p_brand = Utf8("Brand#12") AND #part.p_container IN ([Utf8("SM CASE"), Utf8("SM BOX"), Utf8("SM PACK"), Utf8("SM PKG")]) AND #lineitem.l_quantity >= Int64(1) AND #lineitem.l_quantity <= Int64(11) AND #part.p_size BETWEEN Int64(1) AND Int64(5)

Should have parenthesis around it - each of the conjunctions should before they are joined into a disjunction. Does anyone know if that a problem with the plan or the formatter?

@DhamoPS did you compare the results with postgres or spark to verify they were correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@avantgardnerio This is display format issue. I have verified the results with Postgres and results are correct.

// assert data
let results = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+-----------+-------------------------------+--------------------------+-------------------------------------+",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We now have the ability to specify test data isolated to this particular test. It might be good to use custom test data to show a result vs just empty. See the tpch_q4_correlated() test for an example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I have committed tests with tempdata as you have suggested.
let expected = vec![ "+-----------+-------------------------------+--------------------------+-------------------------------------+", "| p_partkey | SUM(lineitem.l_extendedprice) | AVG(lineitem.l_discount) | COUNT(DISTINCT partsupp.ps_suppkey) |", "+-----------+-------------------------------+--------------------------+-------------------------------------+", "| 63700 | 13309.6 | 0.1 | 1 |", "+-----------+-------------------------------+--------------------------+-------------------------------------+", ];

Copy link
Contributor

@avantgardnerio avantgardnerio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the current test only covers removing expressions from conjunctions. I'd like to see a test cover disjunctions as well, to ensure the results of the execution plan were not altered.

I like the simplified test BTW :)

}
},
// Fix for issue#78 join predicates from inside of OR expr also pulled up properly.
Operator::Or => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can pull out ors the same way we would part of an and - altering a disjunction will affect the outcome. i.e. if it was l.partkey=s.partkey or l.price < 5.00 and we extract the first part into a join clause, we will now only see ones with prices < 5, not ones that matched or were less than 5. A contrived example I'm sure, but I think I'd have to see a counter-example working before I am confident we didn't alter the results.

I think a better place to solve this might be with an optimizer rule, instead of in the planner - in an optimizer rule we could always just return Err() if the situation isn't perfectly to our liking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I agree that OR predicates need to be handled differently.
OR predicates are handled in below code.

        ` Operator::Or => {
            let mut left_join_keys = vec![];
            let mut right_join_keys = vec![];

            extract_possible_join_keys(left, &mut left_join_keys)?;
            extract_possible_join_keys(right, &mut right_join_keys)?;

            intersect( accum, &left_join_keys, &right_join_keys)
        }`            

In case of OR predicates, we pull the predicates if both left and right child are having same JOIN predicates.
insect() function would ensure that we don't pull the all OR predicates.

In the example given above, l.partkey=s.partkey or l.price < 5.00, the left child of OR predicate would be l.partkey=s.partkey and right child of OR predicate would be ` l.price < 5.00'. There is no common predicates on left and right child of OR expr and so these predicates are not pulled to Join Predicates.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not pull the predicates from disjunctions unless both left and right child of OR expr are having common conjunctions, since it will affect the query results. The idea for this fix is, "If there are common predicates in left and right child of OR expression then we should move those predicates to Join predicates. In case of Q19, it has common join predicate in all 3 OR expressions and so that Join predicate is pulled up to JOIN node.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, I think this kind of rewrite should be done in an SQL optimizer pass so that it will apply to any query plan (e.g. that came from the DataFrame API) rather than only SQL

@alamb
Copy link
Contributor

alamb commented Sep 2, 2022

I plan to review this carefully tomorrow

The Join predicates which are common in all OR expressions are pulled out for JOIN predicates.
The new plan is imported and provides better performance since innerjoin is used instead of crossjoin.
@DhamoPS
Copy link
Contributor Author

DhamoPS commented Sep 2, 2022

I have fixed the issue in testcases.
@andygrove or @alamb can you kick off CI please?

where
(
p_partkey = l_partkey
and p_brand = 'Brand#12'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My question about disjunctions can be rephrased as "what if this and was an or? would we still try to lift it out into a join predicate?"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that scenario, we don't lift the predicate up. I have tested the behavior by running the modified tests.

Using local temp data in testcase so that we get resultset value for the given predicates.
@codecov-commenter
Copy link

Codecov Report

Merging #3334 (11178a3) into master (9a5f17e) will decrease coverage by 0.34%.
The diff coverage is 89.47%.

@@            Coverage Diff             @@
##           master    #3334      +/-   ##
==========================================
- Coverage   85.84%   85.49%   -0.35%     
==========================================
  Files         283      294      +11     
  Lines       51658    54096    +2438     
==========================================
+ Hits        44347    46251    +1904     
- Misses       7311     7845     +534     
Impacted Files Coverage Δ
datafusion/core/tests/sql/predicates.rs 100.00% <ø> (ø)
datafusion/sql/src/planner.rs 80.49% <80.95%> (-1.42%) ⬇️
datafusion/core/tests/sql/subqueries.rs 94.93% <100.00%> (+0.61%) ⬆️
datafusion/core/tests/provider_filter_pushdown.rs 70.45% <0.00%> (-14.48%) ⬇️
datafusion/expr/src/expr_visitor.rs 53.68% <0.00%> (-10.07%) ⬇️
datafusion/expr/src/expr.rs 75.57% <0.00%> (-9.82%) ⬇️
datafusion/expr/src/expr_schema.rs 61.67% <0.00%> (-7.92%) ⬇️
datafusion/sql/src/utils.rs 44.63% <0.00%> (-7.33%) ⬇️
datafusion/expr/src/expr_rewriter.rs 79.69% <0.00%> (-6.40%) ⬇️
datafusion/physical-expr/src/crypto_expressions.rs 56.11% <0.00%> (-6.39%) ⬇️
... and 105 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this work @DhamoPS 🙇 -- the cross join plan is clearly a large problem

My major comment is that there is code (from @xudong963 in #2858) which is attempts to do this exact rewrite

https://github.com/apache/arrow-datafusion/blob/master/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs

Clearly it is not working in the intended way (though now that I leave that comment, I see the comments in that module are limited).

@DhamoPS can you look at https://github.com/apache/arrow-datafusion/blob/master/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs and let us know if you think it is redundant with the code in this PR?

" CrossJoin: [l_partkey:Int64, l_quantity:Float64, p_partkey:Int64, p_brand:Utf8, p_size:Int32]",
" TableScan: lineitem projection=[l_partkey, l_quantity] [l_partkey:Int64, l_quantity:Float64]",
" TableScan: part projection=[p_partkey, p_brand, p_size] [p_partkey:Int64, p_brand:Utf8, p_size:Int32]",
" Filter: #part.p_brand = Utf8(\"Brand#12\") AND #lineitem.l_quantity >= Int64(1) AND #lineitem.l_quantity <= Int64(11) AND #part.p_size BETWEEN Int64(1) AND Int64(5) OR #part.p_brand = Utf8(\"Brand#23\") AND #lineitem.l_quantity >= Int64(10) AND #lineitem.l_quantity <= Int64(20) AND #part.p_size BETWEEN Int64(1) AND Int64(10) OR #part.p_brand = Utf8(\"Brand#34\") AND #lineitem.l_quantity >= Int64(20) AND #lineitem.l_quantity <= Int64(30) AND #part.p_size BETWEEN Int64(1) AND Int64(15) [l_partkey:Int64, l_quantity:Float64, p_partkey:Int64, p_brand:Utf8, p_size:Int32]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is definitely the better plan 👍 @DhamoPS

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb I have checked the #2858. Even though, it would help in handling of disjunctive predicates, it does not solve the problem of #78. I understand that we need to write these rules in optimizer.rs, so that it would be applicable for DATAFRAME API plans as well.
I would convert my fix into optimizer rule as suggested. CrossJoins must be converted to InnerJoins if there is one or more common predicates between those tables.

}
},
// Fix for issue#78 join predicates from inside of OR expr also pulled up properly.
Operator::Or => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, I think this kind of rewrite should be done in an SQL optimizer pass so that it will apply to any query plan (e.g. that came from the DataFrame API) rather than only SQL

@alamb
Copy link
Contributor

alamb commented Sep 3, 2022

I have filed #3351 to hopefully improve the documentation

Copy link
Member

@xudong963 xudong963 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we'll transfer cross join to inner join in the planner phase(not optimizer phase), see the issue #2859.

So after https://github.com/apache/arrow-datafusion/blob/master/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs, though we extracted join keys from predicates, we still can't transfer cross join to inner join for tpch 19.

I think we should move cross join -> inner join related logic from planner to optimizer. It's a common rewrite rule for optimizer and can bring more potential optimization.

@alamb
Copy link
Contributor

alamb commented Sep 12, 2022

Marking as draft to signify this PR has planned changes (and make it easier to find PRs that are in need of review). Please mark "ready for review" when it is next ready or if this change was a mistake.

@alamb alamb marked this pull request as draft September 12, 2022 11:05
@DhamoPS
Copy link
Contributor Author

DhamoPS commented Sep 12, 2022

Marking as draft to signify this PR has planned changes (and make it easier to find PRs that are in need of review). Please mark "ready for review" when it is next ready or if this change was a mistake.

@alamb ok. I am working on a fix for this issue in optimizer rules. I will create new PR for those changes and close this PR.

@alamb
Copy link
Contributor

alamb commented Sep 12, 2022 via email

@DhamoPS
Copy link
Contributor Author

DhamoPS commented Sep 14, 2022

@alamb @avantgardnerio @xudong963 I have created new PR for the issue #78
#3482
Please review the changes.

@DhamoPS DhamoPS closed this Sep 14, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate sql SQL Planner

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Address performance/execution plan of TPCH query 19

5 participants