Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions datafusion/optimizer/src/eliminate_cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ impl OptimizerRule for EliminateCrossJoin {
filter.input.as_ref(),
LogicalPlan::Join(Join {
join_type: JoinType::Inner,
null_equals_null: false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This rule also has it's own logic for identifying join keys in extract_possible_join_keys function. Shouldn't is not distinct from operator be supported there (perhaps by reusing logic from extract_equijoin_predicated_rule)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, seems like we should probably unify that logic before making this change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And also, not sure that this rule shouldn't have any interactions with null_equals_null = true joins -- what is the current behavior of such queries and won't this additional filter change it drastically?

..
}) | LogicalPlan::CrossJoin(_)
);
Expand Down Expand Up @@ -124,6 +125,7 @@ impl OptimizerRule for EliminateCrossJoin {
plan,
LogicalPlan::Join(Join {
join_type: JoinType::Inner,
null_equals_null: false,
..
})
) {
Expand Down Expand Up @@ -268,6 +270,7 @@ fn can_flatten_join_inputs(plan: &LogicalPlan) -> bool {
match child {
LogicalPlan::Join(Join {
join_type: JoinType::Inner,
null_equals_null: false,
..
})
| LogicalPlan::CrossJoin(_) => {
Expand Down
174 changes: 126 additions & 48 deletions datafusion/optimizer/src/extract_equijoin_predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::tree_node::Transformed;
use datafusion_common::DFSchema;
use datafusion_common::Result;
use datafusion_expr::utils::split_conjunction_owned;
use datafusion_expr::utils::split_conjunction;
use datafusion_expr::utils::{can_hash, find_valid_equijoin_key_pair};
use datafusion_expr::{BinaryExpr, Expr, ExprSchemable, Join, LogicalPlan, Operator};
// equijoin predicate
Expand Down Expand Up @@ -67,67 +67,145 @@ impl OptimizerRule for ExtractEquijoinPredicate {
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
match plan {
LogicalPlan::Join(Join {
left,
right,
mut on,
filter: Some(expr),
join_type,
join_constraint,
schema,
null_equals_null,
}) => {
let left_schema = left.schema();
let right_schema = right.schema();
let (equijoin_predicates, non_equijoin_expr) =
split_eq_and_noneq_join_predicate(expr, left_schema, right_schema)?;

if !equijoin_predicates.is_empty() {
on.extend(equijoin_predicates);
Ok(Transformed::yes(LogicalPlan::Join(Join {
left,
right,
on,
filter: non_equijoin_expr,
join_type,
join_constraint,
schema,
null_equals_null,
})))
} else {
Ok(Transformed::no(LogicalPlan::Join(Join {
left,
right,
on,
filter: non_equijoin_expr,
join_type,
join_constraint,
schema,
null_equals_null,
})))
}
}
LogicalPlan::Join(join) => extract_equijoin_predicate(join),
_ => Ok(Transformed::no(plan)),
}
}
}

fn extract_equijoin_predicate(join: Join) -> Result<Transformed<LogicalPlan>> {
fn update_join_predicate(
join: Join,
extra_on: Vec<EquijoinPredicate>,
filter: Option<Expr>,
null_equals_null: bool,
) -> Transformed<LogicalPlan> {
if extra_on.is_empty() {
Transformed::no(LogicalPlan::Join(join))
} else {
let mut on = join.on;
on.extend(extra_on);
Transformed::yes(LogicalPlan::Join(Join {
left: join.left,
right: join.right,
on,
filter,
join_type: join.join_type,
join_constraint: join.join_constraint,
schema: join.schema,
null_equals_null,
}))
}
}
if join.filter.is_none() {
return Ok(Transformed::no(LogicalPlan::Join(join)));
}
let expr = join.filter.as_ref().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will let-else work here to avoid unwrap?


let left_schema = join.left.schema();
let right_schema = join.right.schema();

if join.on.is_empty() {
let (eq_predicates, null_equals_null, non_eq_expr) =
split_eq_and_noneq_join_predicate(expr, left_schema, right_schema)?;
Ok(update_join_predicate(
join,
eq_predicates,
non_eq_expr,
null_equals_null,
))
} else if join.null_equals_null {
let (eq_predicates, non_eq_expr) =
split_eq_and_noneq_join_predicate_nulls_eq(expr, left_schema, right_schema)?;

Ok(update_join_predicate(
join,
eq_predicates,
non_eq_expr,
true,
))
} else {
let (eq_predicates, non_eq_expr) =
split_eq_and_noneq_join_predicate_nulls_not_eq(
expr,
left_schema,
right_schema,
)?;

Ok(update_join_predicate(
join,
eq_predicates,
non_eq_expr,
false,
))
}
}

fn split_eq_and_noneq_join_predicate(
filter: Expr,
filter: &Expr,
left_schema: &DFSchema,
right_schema: &DFSchema,
) -> Result<(Vec<EquijoinPredicate>, bool, Option<Expr>)> {
let (eq, noneq) = split_eq_and_noneq_join_predicate_nulls_not_eq(
filter,
left_schema,
right_schema,
)?;
if !eq.is_empty() {
Ok((eq, false, noneq))
} else {
let (eq, noneq) = split_eq_and_noneq_join_predicate_nulls_eq(
filter,
left_schema,
right_schema,
)?;
Ok((eq, true, noneq))
}
}

fn split_eq_and_noneq_join_predicate_nulls_eq(
filter: &Expr,
left_schema: &DFSchema,
right_schema: &DFSchema,
) -> Result<(Vec<EquijoinPredicate>, Option<Expr>)> {
split_eq_and_noneq_join_predicate_impl(
filter,
left_schema,
right_schema,
Operator::IsNotDistinctFrom,
)
}

fn split_eq_and_noneq_join_predicate_nulls_not_eq(
filter: &Expr,
left_schema: &DFSchema,
right_schema: &DFSchema,
) -> Result<(Vec<EquijoinPredicate>, Option<Expr>)> {
split_eq_and_noneq_join_predicate_impl(
filter,
left_schema,
right_schema,
Operator::Eq,
)
}

fn split_eq_and_noneq_join_predicate_impl(
filter: &Expr,
left_schema: &DFSchema,
right_schema: &DFSchema,
eq_op: Operator,
) -> Result<(Vec<EquijoinPredicate>, Option<Expr>)> {
let exprs = split_conjunction_owned(filter);
let exprs = split_conjunction(filter);
Copy link
Contributor

@korowa korowa Sep 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note: this part (along with cloning exprs) in elses of this function was changed in #10165 as a part of "reducing expr cloning in optimizer" initiative, so, probably, in this place it's preferred to avoid cloning, unless it's required.


let mut accum_join_keys: Vec<(Expr, Expr)> = vec![];
let mut accum_filters: Vec<Expr> = vec![];
for expr in exprs {
match expr {
Expr::BinaryExpr(BinaryExpr {
ref left,
op: Operator::Eq,
op,
ref right,
}) => {
}) if *op == eq_op => {
let join_key_pair =
find_valid_equijoin_key_pair(left, right, left_schema, right_schema)?;

Expand All @@ -138,13 +216,13 @@ fn split_eq_and_noneq_join_predicate(
if can_hash(&left_expr_type) && can_hash(&right_expr_type) {
accum_join_keys.push((left_expr, right_expr));
} else {
accum_filters.push(expr);
accum_filters.push(expr.clone());
}
} else {
accum_filters.push(expr);
accum_filters.push(expr.clone());
}
}
_ => accum_filters.push(expr),
_ => accum_filters.push(expr.clone()),
}
}

Expand Down
44 changes: 44 additions & 0 deletions datafusion/sqllogictest/test_files/join.slt
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,50 @@ set datafusion.execution.target_partitions = 4;
statement ok
set datafusion.optimizer.repartition_joins = false;

statement ok
DROP TABLE t1;

statement ok
DROP TABLE t2;

# tables for join nulls equals null
statement ok
CREATE TABLE IF NOT EXISTS t1(t1_id INT NULL, t1_int INT) AS VALUES
(11, 1),
(22, 2),
(NULL, 3);

statement ok
CREATE TABLE IF NOT EXISTS t2(t2_id INT NULL, t2_int INT) AS VALUES
(11, 3),
(33, 1),
(NULL, 5),
(NULL, 6);


# IS NOT DISTRINCT can be transformed into equijoin
query TT
EXPLAIN SELECT t1_id, t1_int, t2_int FROM t1 JOIN t2 ON t1_id IS NOT DISTINCT from t2_id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be useful to also have the following test cases

  • demonstrating that equality predicates have priority over "is not distinct from" when picking ON conditions from the filter
  • how these joins will work with eliminate_cross_join rule (which if I'm not mistaken also works for filter over inner joins event with ON conditions)
  • how these join will be treated by push_down_filter -- in some cases it seems to be able to push filters into ON conditions -- if so, resulting query won't be correct in case of null_equals_null joins

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestions for:

how these join will be treated by push_down_filter -- in some cases it seems to be able to push filters into ON conditions -- if so, resulting query won't be correct in case of null_equals_null joins

I thought these would be pushed into the filter and not the on (https://github.com/eejbyfeldt/datafusion/blob/1d4cf53169fd9a611a7e24e9ac4880d3e24b131e/datafusion/expr/src/logical_plan/plan.rs#L2965-L2968) and therefore still be correct.

----
logical_plan
01)Projection: t1.t1_id, t1.t1_int, t2.t2_int
02)--Inner Join: t1.t1_id = t2.t2_id
03)----TableScan: t1 projection=[t1_id, t1_int]
04)----TableScan: t2 projection=[t2_id, t2_int]
physical_plan
01)CoalesceBatchesExec: target_batch_size=8192
02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t1_id@0, t2_id@0)], projection=[t1_id@0, t1_int@1, t2_int@3]
03)----MemoryExec: partitions=1, partition_sizes=[1]
04)----MemoryExec: partitions=1, partition_sizes=[1]

query III rowsort
SELECT t1_id, t1_int, t2_int FROM t1 JOIN t2 ON t1_id IS NOT DISTINCT from t2_id
----
11 1 3
NULL 3 5
NULL 3 6


statement ok
DROP TABLE t1;

Expand Down