Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 40 additions & 26 deletions datafusion/optimizer/src/decorrelate_where_exists.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::utils::{
verify_not_disjunction,
};
use crate::{utils, OptimizerConfig, OptimizerRule};
use datafusion_common::{context, plan_err, DataFusionError};
use datafusion_common::context;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep DataFusionError, following code we can use Result instead of datafusion_common::Result

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe we could just do use datafusion_common::Result

use datafusion_expr::logical_plan::{Filter, JoinType, Subquery};
use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder};
use std::sync::Arc;
Expand Down Expand Up @@ -75,6 +75,16 @@ impl OptimizerRule for DecorrelateWhereExists {
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> datafusion_common::Result<LogicalPlan> {
Ok(self
.try_optimize(plan, optimizer_config)?
.unwrap_or_else(|| plan.clone()))
}

fn try_optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> datafusion_common::Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Filter(filter) => {
let predicate = filter.predicate();
Expand All @@ -91,19 +101,28 @@ impl OptimizerRule for DecorrelateWhereExists {
)?);
if subqueries.is_empty() {
// regular filter, no subquery exists clause here
return Ok(optimized_plan);
return Ok(Some(optimized_plan));
}

// iterate through all exists clauses in predicate, turning each into a join
let mut cur_input = (**filter_input).clone();
for subquery in subqueries {
cur_input = optimize_exists(&subquery, &cur_input, &other_exprs)?;
if let Some(x) = optimize_exists(&subquery, &cur_input, &other_exprs)?
{
cur_input = x;
} else {
return Ok(None);
}
}
Ok(cur_input)
Ok(Some(cur_input))
}
_ => {
// Apply the optimization to all inputs of the plan
utils::optimize_children(self, plan, optimizer_config)
Ok(Some(utils::optimize_children(
self,
plan,
optimizer_config,
)?))
}
}
}
Expand Down Expand Up @@ -132,20 +151,22 @@ fn optimize_exists(
query_info: &SubqueryInfo,
outer_input: &LogicalPlan,
outer_other_exprs: &[Expr],
) -> datafusion_common::Result<LogicalPlan> {
) -> datafusion_common::Result<Option<LogicalPlan>> {
let subqry_filter = match query_info.query.subquery.as_ref() {
LogicalPlan::Distinct(subqry_distinct) => match subqry_distinct.input.as_ref() {
LogicalPlan::Projection(subqry_proj) => {
Filter::try_from_plan(&subqry_proj.input)
}
_ => Err(DataFusionError::NotImplemented(
"Subquery currently only supports distinct or projection".to_string(),
)),
_ => {
// Subquery currently only supports distinct or projection
return Ok(None);
}
},
LogicalPlan::Projection(subqry_proj) => Filter::try_from_plan(&subqry_proj.input),
_ => Err(DataFusionError::NotImplemented(
"Subquery currently only supports distinct or projection".to_string(),
)),
_ => {
// Subquery currently only supports distinct or projection
return Ok(None);
}
}
.map_err(|e| context!("cannot optimize non-correlated subquery", e))?;

Expand All @@ -159,7 +180,8 @@ fn optimize_exists(
let (outer_cols, subqry_cols, join_filters) =
exprs_to_join_cols(&col_exprs, subqry_filter.input().schema(), false)?;
if subqry_cols.is_empty() || outer_cols.is_empty() {
plan_err!("cannot optimize non-correlated subquery")?;
// cannot optimize non-correlated subquery
return Ok(None);
}

// build subquery side of join - the thing the subquery was querying
Expand Down Expand Up @@ -188,7 +210,7 @@ fn optimize_exists(
}

let result = new_plan.build()?;
Ok(result)
Ok(Some(result))
}

struct SubqueryInfo {
Expand Down Expand Up @@ -318,9 +340,7 @@ mod tests {
.project(vec![col("customer.c_custkey")])?
.build()?;

let expected = r#"cannot optimize non-correlated subquery"#;

assert_optimizer_err(&DecorrelateWhereExists::new(), &plan, expected);
assert_optimization_skipped(&DecorrelateWhereExists::new(), &plan);
Ok(())
}

Expand All @@ -339,9 +359,7 @@ mod tests {
.project(vec![col("customer.c_custkey")])?
.build()?;

let expected = r#"cannot optimize non-correlated subquery"#;

assert_optimizer_err(&DecorrelateWhereExists::new(), &plan, expected);
assert_optimization_skipped(&DecorrelateWhereExists::new(), &plan);
Ok(())
}

Expand All @@ -360,9 +378,7 @@ mod tests {
.project(vec![col("customer.c_custkey")])?
.build()?;

let expected = r#"cannot optimize non-correlated subquery"#;

assert_optimizer_err(&DecorrelateWhereExists::new(), &plan, expected);
assert_optimization_skipped(&DecorrelateWhereExists::new(), &plan);
Ok(())
}

Expand Down Expand Up @@ -426,9 +442,7 @@ mod tests {
.project(vec![col("customer.c_custkey")])?
.build()?;

let expected = r#"cannot optimize non-correlated subquery"#;

assert_optimizer_err(&DecorrelateWhereExists::new(), &plan, expected);
assert_optimization_skipped(&DecorrelateWhereExists::new(), &plan);
Ok(())
}

Expand Down
21 changes: 18 additions & 3 deletions datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,18 @@ use std::time::Instant;
/// way. If there are no suitable transformations for the input plan,
/// the optimizer can simply return it as is.
pub trait OptimizerRule {
/// Rewrite `plan` to an optimized form
/// Try and rewrite `plan` to an optimized form, returning None if the plan cannot be
/// optimized by this rule.
fn try_optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
self.optimize(plan, optimizer_config).map(Some)
}

/// Rewrite `plan` to an optimized form. This method will eventually be deprecated and
/// replace by `try_optimize`.
fn optimize(
&self,
plan: &LogicalPlan,
Expand Down Expand Up @@ -209,13 +220,17 @@ impl Optimizer {
log_plan(&format!("Optimizer input (pass {})", i), &new_plan);

for rule in &self.rules {
let result = rule.optimize(&new_plan, optimizer_config);
let result = rule.try_optimize(&new_plan, optimizer_config);
match result {
Ok(plan) => {
Ok(Some(plan)) => {
new_plan = plan;
observer(&new_plan, rule.as_ref());
log_plan(rule.name(), &new_plan);
}
Ok(None) => {
observer(&new_plan, rule.as_ref());
log_plan(rule.name(), &new_plan);
}
Err(ref e) => {
if optimizer_config.skip_failing_rules {
// Note to future readers: if you see this warning it signals a
Expand Down
8 changes: 8 additions & 0 deletions datafusion/optimizer/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,11 @@ pub fn assert_optimizer_err(
}
}
}

pub fn assert_optimization_skipped(rule: &dyn OptimizerRule, plan: &LogicalPlan) {
let new_plan = rule.optimize(plan, &mut OptimizerConfig::new()).unwrap();
assert_eq!(
format!("{}", plan.display_indent()),
format!("{}", new_plan.display_indent())
);
}
11 changes: 5 additions & 6 deletions datafusion/optimizer/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,11 @@ pub fn optimize_children(
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
let new_exprs = plan.expressions();
let new_inputs = plan
.inputs()
.into_iter()
.map(|plan| optimizer.optimize(plan, optimizer_config))
.collect::<Result<Vec<_>>>()?;

let mut new_inputs = Vec::with_capacity(plan.inputs().len());
for input in plan.inputs() {
let new_input = optimizer.try_optimize(input, optimizer_config)?;
new_inputs.push(new_input.unwrap_or_else(|| input.clone()))
}
from_plan(plan, &new_exprs, &new_inputs)
}

Expand Down