Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1079,6 +1079,34 @@ impl Expr {
}
}

/// Recursively potentially multiple aliases from an expression.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is trim_expr renamed and with examples

///
/// If the expression is not an alias, the expression is returned unchanged.
/// This method removes directly nested aliases, but not other nested
/// aliases.
///
/// # Example
/// ```
/// # use datafusion_expr::col;
/// // `foo as "bar"` is unaliased to `foo`
/// let expr = col("foo").alias("bar");
/// assert_eq!(expr.unalias_nested(), col("foo"));
///
/// // `foo as "bar" + baz` is not unaliased
/// let expr = col("foo").alias("bar") + col("baz");
/// assert_eq!(expr.clone().unalias_nested(), expr);
///
/// // `foo as "bar" as "baz" is unalaised to foo
/// let expr = col("foo").alias("bar").alias("baz");
/// assert_eq!(expr.unalias_nested(), col("foo"));
/// ```
pub fn unalias_nested(self) -> Expr {
match self {
Expr::Alias(alias) => alias.expr.unalias_nested(),
_ => self,
}
}

/// Return `self IN <list>` if `negated` is false, otherwise
/// return `self NOT IN <list>`.a
pub fn in_list(self, list: Vec<Expr>, negated: bool) -> Expr {
Expand Down
195 changes: 195 additions & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use datafusion_common::{

// backwards compatibility
use crate::display::PgJsonVisitor;
use crate::logical_plan::tree_node::unwrap_arc;
pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
pub use datafusion_common::{JoinConstraint, JoinType};

Expand Down Expand Up @@ -467,6 +468,200 @@ impl LogicalPlan {
self.with_new_exprs(self.expressions(), inputs.to_vec())
}

/// Recomputes schema and type information for this LogicalPlan if needed.
///
/// Some `LogicalPlan`s may need to recompute their schema if the number or
/// type of expressions have been changed (for example due to type
/// coercion). For example [`LogicalPlan::Projection`]s schema depends on
/// its expressions.
///
/// Some `LogicalPlan`s schema is unaffected by any changes to their
/// expressions. For example [`LogicalPlan::Filter`] schema is always the
/// same as its input schema.
///
/// # Return value
/// Returns an error if there is some issue recomputing the schema.
///
/// # Notes
///
/// * Does not recursively recompute schema for input (child) plans.
pub fn recompute_schema(self) -> Result<Self> {
match self {
// Since expr may be different than the previous expr, schema of the projection
// may change. We need to use try_new method instead of try_new_with_schema method.
LogicalPlan::Projection(Projection {
expr,
input,
schema: _,
}) => Projection::try_new(expr, input).map(LogicalPlan::Projection),
LogicalPlan::Dml(_) => Ok(self),
LogicalPlan::Copy(_) => Ok(self),
LogicalPlan::Values(Values { schema, values }) => {
// todo it isn't clear why the schema is not recomputed here
Ok(LogicalPlan::Values(Values { schema, values }))
}
LogicalPlan::Filter(Filter { predicate, input }) => {
// todo: should this logic be moved to Filter::try_new?

// filter predicates should not contain aliased expressions so we remove any aliases
// before this logic was added we would have aliases within filters such as for
// benchmark q6:
//
// lineitem.l_shipdate >= Date32(\"8766\")
// AND lineitem.l_shipdate < Date32(\"9131\")
// AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS lineitem.l_discount >=
// Decimal128(Some(49999999999999),30,15)
// AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS lineitem.l_discount <=
// Decimal128(Some(69999999999999),30,15)
// AND lineitem.l_quantity < Decimal128(Some(2400),15,2)

let predicate = predicate
.transform_down(|expr| {
match expr {
Expr::Exists { .. }
| Expr::ScalarSubquery(_)
| Expr::InSubquery(_) => {
// subqueries could contain aliases so we don't recurse into those
Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump))
}
Expr::Alias(_) => Ok(Transformed::new(
expr.unalias(),
true,
TreeNodeRecursion::Jump,
)),
_ => Ok(Transformed::no(expr)),
}
})
.data()?;

Filter::try_new(predicate, input).map(LogicalPlan::Filter)
}
LogicalPlan::Repartition(_) => Ok(self),
LogicalPlan::Window(Window {
input,
window_expr,
schema: _,
}) => Window::try_new(window_expr, input).map(LogicalPlan::Window),
LogicalPlan::Aggregate(Aggregate {
input,
group_expr,
aggr_expr,
schema: _,
}) => Aggregate::try_new(input, group_expr, aggr_expr)
.map(LogicalPlan::Aggregate),
LogicalPlan::Sort(_) => Ok(self),
LogicalPlan::Join(Join {
left,
right,
filter,
join_type,
join_constraint,
on,
schema: _,
null_equals_null,
}) => {
let schema =
build_join_schema(left.schema(), right.schema(), &join_type)?;

let new_on: Vec<_> = on
.into_iter()
.map(|equi_expr| {
// SimplifyExpression rule may add alias to the equi_expr.
(equi_expr.0.unalias(), equi_expr.1.unalias())
})
.collect();

Ok(LogicalPlan::Join(Join {
left,
right,
join_type,
join_constraint,
on: new_on,
filter,
schema: DFSchemaRef::new(schema),
null_equals_null,
}))
}
LogicalPlan::CrossJoin(CrossJoin {
left,
right,
schema: _,
}) => {
let join_schema =
build_join_schema(left.schema(), right.schema(), &JoinType::Inner)?;

Ok(LogicalPlan::CrossJoin(CrossJoin {
left,
right,
schema: join_schema.into(),
}))
}
LogicalPlan::Subquery(_) => Ok(self),
LogicalPlan::SubqueryAlias(SubqueryAlias {
input,
alias,
schema: _,
}) => SubqueryAlias::try_new(input, alias).map(LogicalPlan::SubqueryAlias),
LogicalPlan::Limit(_) => Ok(self),
LogicalPlan::Ddl(_) => Ok(self),
LogicalPlan::Extension(Extension { node }) => {
// todo make an API that does not require cloning
// This requires a copy of the extension nodes expressions and inputs
let expr = node.expressions();
let inputs: Vec<_> = node.inputs().into_iter().cloned().collect();
Ok(LogicalPlan::Extension(Extension {
node: node.from_template(&expr, &inputs),
}))
}
LogicalPlan::Union(Union { inputs, schema }) => {
let input_schema = inputs[0].schema();
// If inputs are not pruned do not change schema
// TODO this seems wrong (shouldn't we always use the schema of the input?)
let schema = if schema.fields().len() == input_schema.fields().len() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that shouldn't happen.... we can follow up to run tests with only 1 schema

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it doesn't make sense -- however, it is the same logic as in with_new_exprs:

LogicalPlan::Union(Union { schema, .. }) => {
let input_schema = inputs[0].schema();
// If inputs are not pruned do not change schema.
let schema = if schema.fields().len() == input_schema.fields().len() {
schema.clone()
} else {
input_schema.clone()
};
Ok(LogicalPlan::Union(Union {
inputs: inputs.into_iter().map(Arc::new).collect(),
schema,
}))
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #10442

schema.clone()
} else {
input_schema.clone()
};
Ok(LogicalPlan::Union(Union { inputs, schema }))
}
LogicalPlan::Distinct(distinct) => {
let distinct = match distinct {
Distinct::All(input) => Distinct::All(input),
Distinct::On(DistinctOn {
on_expr,
select_expr,
sort_expr,
input,
schema: _,
}) => Distinct::On(DistinctOn::try_new(
on_expr,
select_expr,
sort_expr,
input,
)?),
};
Ok(LogicalPlan::Distinct(distinct))
}
LogicalPlan::RecursiveQuery(_) => Ok(self),
LogicalPlan::Analyze(_) => Ok(self),
LogicalPlan::Explain(_) => Ok(self),
LogicalPlan::Prepare(_) => Ok(self),
LogicalPlan::TableScan(_) => Ok(self),
LogicalPlan::EmptyRelation(_) => Ok(self),
LogicalPlan::Statement(_) => Ok(self),
LogicalPlan::DescribeTable(_) => Ok(self),
LogicalPlan::Unnest(Unnest {
input,
columns,
schema: _,
options,
}) => {
// Update schema with unnested column type.
unnest_with_options(unwrap_arc(input), columns, options)
}
}
}

/// Returns a new `LogicalPlan` based on `self` with inputs and
/// expressions replaced.
///
Expand Down
Loading