@@ -52,6 +52,7 @@ use datafusion_common::{
5252
5353// backwards compatibility
5454use crate :: display:: PgJsonVisitor ;
55+ use crate :: logical_plan:: tree_node:: unwrap_arc;
5556pub use datafusion_common:: display:: { PlanType , StringifiedPlan , ToStringifiedPlan } ;
5657pub use datafusion_common:: { JoinConstraint , JoinType } ;
5758
@@ -467,6 +468,200 @@ impl LogicalPlan {
467468 self . with_new_exprs ( self . expressions ( ) , inputs. to_vec ( ) )
468469 }
469470
471+ /// Recomputes schema and type information for this LogicalPlan if needed.
472+ ///
473+ /// Some `LogicalPlan`s may need to recompute their schema if the number or
474+ /// type of expressions have been changed (for example due to type
475+ /// coercion). For example [`LogicalPlan::Projection`]s schema depends on
476+ /// its expressions.
477+ ///
478+ /// Some `LogicalPlan`s schema is unaffected by any changes to their
479+ /// expressions. For example [`LogicalPlan::Filter`] schema is always the
480+ /// same as its input schema.
481+ ///
482+ /// # Return value
483+ /// Returns an error if there is some issue recomputing the schema.
484+ ///
485+ /// # Notes
486+ ///
487+ /// * Does not recursively recompute schema for input (child) plans.
488+ pub fn recompute_schema ( self ) -> Result < Self > {
489+ match self {
490+ // Since expr may be different than the previous expr, schema of the projection
491+ // may change. We need to use try_new method instead of try_new_with_schema method.
492+ LogicalPlan :: Projection ( Projection {
493+ expr,
494+ input,
495+ schema : _,
496+ } ) => Projection :: try_new ( expr, input) . map ( LogicalPlan :: Projection ) ,
497+ LogicalPlan :: Dml ( _) => Ok ( self ) ,
498+ LogicalPlan :: Copy ( _) => Ok ( self ) ,
499+ LogicalPlan :: Values ( Values { schema, values } ) => {
500+ // todo it isn't clear why the schema is not recomputed here
501+ Ok ( LogicalPlan :: Values ( Values { schema, values } ) )
502+ }
503+ LogicalPlan :: Filter ( Filter { predicate, input } ) => {
504+ // todo: should this logic be moved to Filter::try_new?
505+
506+ // filter predicates should not contain aliased expressions so we remove any aliases
507+ // before this logic was added we would have aliases within filters such as for
508+ // benchmark q6:
509+ //
510+ // lineitem.l_shipdate >= Date32(\"8766\")
511+ // AND lineitem.l_shipdate < Date32(\"9131\")
512+ // AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS lineitem.l_discount >=
513+ // Decimal128(Some(49999999999999),30,15)
514+ // AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS lineitem.l_discount <=
515+ // Decimal128(Some(69999999999999),30,15)
516+ // AND lineitem.l_quantity < Decimal128(Some(2400),15,2)
517+
518+ let predicate = predicate
519+ . transform_down ( |expr| {
520+ match expr {
521+ Expr :: Exists { .. }
522+ | Expr :: ScalarSubquery ( _)
523+ | Expr :: InSubquery ( _) => {
524+ // subqueries could contain aliases so we don't recurse into those
525+ Ok ( Transformed :: new ( expr, false , TreeNodeRecursion :: Jump ) )
526+ }
527+ Expr :: Alias ( _) => Ok ( Transformed :: new (
528+ expr. unalias ( ) ,
529+ true ,
530+ TreeNodeRecursion :: Jump ,
531+ ) ) ,
532+ _ => Ok ( Transformed :: no ( expr) ) ,
533+ }
534+ } )
535+ . data ( ) ?;
536+
537+ Filter :: try_new ( predicate, input) . map ( LogicalPlan :: Filter )
538+ }
539+ LogicalPlan :: Repartition ( _) => Ok ( self ) ,
540+ LogicalPlan :: Window ( Window {
541+ input,
542+ window_expr,
543+ schema : _,
544+ } ) => Window :: try_new ( window_expr, input) . map ( LogicalPlan :: Window ) ,
545+ LogicalPlan :: Aggregate ( Aggregate {
546+ input,
547+ group_expr,
548+ aggr_expr,
549+ schema : _,
550+ } ) => Aggregate :: try_new ( input, group_expr, aggr_expr)
551+ . map ( LogicalPlan :: Aggregate ) ,
552+ LogicalPlan :: Sort ( _) => Ok ( self ) ,
553+ LogicalPlan :: Join ( Join {
554+ left,
555+ right,
556+ filter,
557+ join_type,
558+ join_constraint,
559+ on,
560+ schema : _,
561+ null_equals_null,
562+ } ) => {
563+ let schema =
564+ build_join_schema ( left. schema ( ) , right. schema ( ) , & join_type) ?;
565+
566+ let new_on: Vec < _ > = on
567+ . into_iter ( )
568+ . map ( |equi_expr| {
569+ // SimplifyExpression rule may add alias to the equi_expr.
570+ ( equi_expr. 0 . unalias ( ) , equi_expr. 1 . unalias ( ) )
571+ } )
572+ . collect ( ) ;
573+
574+ Ok ( LogicalPlan :: Join ( Join {
575+ left,
576+ right,
577+ join_type,
578+ join_constraint,
579+ on : new_on,
580+ filter,
581+ schema : DFSchemaRef :: new ( schema) ,
582+ null_equals_null,
583+ } ) )
584+ }
585+ LogicalPlan :: CrossJoin ( CrossJoin {
586+ left,
587+ right,
588+ schema : _,
589+ } ) => {
590+ let join_schema =
591+ build_join_schema ( left. schema ( ) , right. schema ( ) , & JoinType :: Inner ) ?;
592+
593+ Ok ( LogicalPlan :: CrossJoin ( CrossJoin {
594+ left,
595+ right,
596+ schema : join_schema. into ( ) ,
597+ } ) )
598+ }
599+ LogicalPlan :: Subquery ( _) => Ok ( self ) ,
600+ LogicalPlan :: SubqueryAlias ( SubqueryAlias {
601+ input,
602+ alias,
603+ schema : _,
604+ } ) => SubqueryAlias :: try_new ( input, alias) . map ( LogicalPlan :: SubqueryAlias ) ,
605+ LogicalPlan :: Limit ( _) => Ok ( self ) ,
606+ LogicalPlan :: Ddl ( _) => Ok ( self ) ,
607+ LogicalPlan :: Extension ( Extension { node } ) => {
608+ // todo make an API that does not require cloning
609+ // This requires a copy of the extension nodes expressions and inputs
610+ let expr = node. expressions ( ) ;
611+ let inputs: Vec < _ > = node. inputs ( ) . into_iter ( ) . cloned ( ) . collect ( ) ;
612+ Ok ( LogicalPlan :: Extension ( Extension {
613+ node : node. from_template ( & expr, & inputs) ,
614+ } ) )
615+ }
616+ LogicalPlan :: Union ( Union { inputs, schema } ) => {
617+ let input_schema = inputs[ 0 ] . schema ( ) ;
618+ // If inputs are not pruned do not change schema
619+ // TODO this seems wrong (shouldn't we always use the schema of the input?)
620+ let schema = if schema. fields ( ) . len ( ) == input_schema. fields ( ) . len ( ) {
621+ schema. clone ( )
622+ } else {
623+ input_schema. clone ( )
624+ } ;
625+ Ok ( LogicalPlan :: Union ( Union { inputs, schema } ) )
626+ }
627+ LogicalPlan :: Distinct ( distinct) => {
628+ let distinct = match distinct {
629+ Distinct :: All ( input) => Distinct :: All ( input) ,
630+ Distinct :: On ( DistinctOn {
631+ on_expr,
632+ select_expr,
633+ sort_expr,
634+ input,
635+ schema : _,
636+ } ) => Distinct :: On ( DistinctOn :: try_new (
637+ on_expr,
638+ select_expr,
639+ sort_expr,
640+ input,
641+ ) ?) ,
642+ } ;
643+ Ok ( LogicalPlan :: Distinct ( distinct) )
644+ }
645+ LogicalPlan :: RecursiveQuery ( _) => Ok ( self ) ,
646+ LogicalPlan :: Analyze ( _) => Ok ( self ) ,
647+ LogicalPlan :: Explain ( _) => Ok ( self ) ,
648+ LogicalPlan :: Prepare ( _) => Ok ( self ) ,
649+ LogicalPlan :: TableScan ( _) => Ok ( self ) ,
650+ LogicalPlan :: EmptyRelation ( _) => Ok ( self ) ,
651+ LogicalPlan :: Statement ( _) => Ok ( self ) ,
652+ LogicalPlan :: DescribeTable ( _) => Ok ( self ) ,
653+ LogicalPlan :: Unnest ( Unnest {
654+ input,
655+ columns,
656+ schema : _,
657+ options,
658+ } ) => {
659+ // Update schema with unnested column type.
660+ unnest_with_options ( unwrap_arc ( input) , columns, options)
661+ }
662+ }
663+ }
664+
470665 /// Returns a new `LogicalPlan` based on `self` with inputs and
471666 /// expressions replaced.
472667 ///
0 commit comments