Skip to content

Commit f897127

Browse files
committed
add try_optimize() for all rules.
1 parent a5cf577 commit f897127

19 files changed

+460
-150
lines changed

datafusion/optimizer/src/common_subexpr_eliminate.rs

Lines changed: 39 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,9 @@ impl CommonSubexprEliminate {
7979
})
8080
.collect::<Result<Vec<_>>>()?;
8181

82-
let mut new_input = self.optimize(input, optimizer_config)?;
82+
let mut new_input = self
83+
.try_optimize(input, optimizer_config)?
84+
.unwrap_or_else(|| input.clone());
8385
if !affected_id.is_empty() {
8486
new_input = build_project_plan(new_input, affected_id, expr_set)?;
8587
}
@@ -94,6 +96,16 @@ impl OptimizerRule for CommonSubexprEliminate {
9496
plan: &LogicalPlan,
9597
optimizer_config: &mut OptimizerConfig,
9698
) -> Result<LogicalPlan> {
99+
Ok(self
100+
.try_optimize(plan, optimizer_config)?
101+
.unwrap_or_else(|| plan.clone()))
102+
}
103+
104+
fn try_optimize(
105+
&self,
106+
plan: &LogicalPlan,
107+
optimizer_config: &mut OptimizerConfig,
108+
) -> Result<Option<LogicalPlan>> {
97109
let mut expr_set = ExprSet::new();
98110

99111
match plan {
@@ -113,11 +125,13 @@ impl OptimizerRule for CommonSubexprEliminate {
113125
optimizer_config,
114126
)?;
115127

116-
Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
117-
pop_expr(&mut new_expr)?,
118-
Arc::new(new_input),
119-
schema.clone(),
120-
)?))
128+
Ok(Some(LogicalPlan::Projection(
129+
Projection::try_new_with_schema(
130+
pop_expr(&mut new_expr)?,
131+
Arc::new(new_input),
132+
schema.clone(),
133+
)?,
134+
)))
121135
}
122136
LogicalPlan::Filter(filter) => {
123137
let input = filter.input();
@@ -140,10 +154,10 @@ impl OptimizerRule for CommonSubexprEliminate {
140154
)?;
141155

142156
if let Some(predicate) = pop_expr(&mut new_expr)?.pop() {
143-
Ok(LogicalPlan::Filter(Filter::try_new(
157+
Ok(Some(LogicalPlan::Filter(Filter::try_new(
144158
predicate,
145159
Arc::new(new_input),
146-
)?))
160+
)?)))
147161
} else {
148162
Err(DataFusionError::Internal(
149163
"Failed to pop predicate expr".to_string(),
@@ -166,11 +180,11 @@ impl OptimizerRule for CommonSubexprEliminate {
166180
optimizer_config,
167181
)?;
168182

169-
Ok(LogicalPlan::Window(Window {
183+
Ok(Some(LogicalPlan::Window(Window {
170184
input: Arc::new(new_input),
171185
window_expr: pop_expr(&mut new_expr)?,
172186
schema: schema.clone(),
173-
}))
187+
})))
174188
}
175189
LogicalPlan::Aggregate(Aggregate {
176190
group_expr,
@@ -194,12 +208,14 @@ impl OptimizerRule for CommonSubexprEliminate {
194208
let new_aggr_expr = pop_expr(&mut new_expr)?;
195209
let new_group_expr = pop_expr(&mut new_expr)?;
196210

197-
Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
198-
Arc::new(new_input),
199-
new_group_expr,
200-
new_aggr_expr,
201-
schema.clone(),
202-
)?))
211+
Ok(Some(LogicalPlan::Aggregate(
212+
Aggregate::try_new_with_schema(
213+
Arc::new(new_input),
214+
new_group_expr,
215+
new_aggr_expr,
216+
schema.clone(),
217+
)?,
218+
)))
203219
}
204220
LogicalPlan::Sort(Sort { expr, input, fetch }) => {
205221
let input_schema = Arc::clone(input.schema());
@@ -213,11 +229,11 @@ impl OptimizerRule for CommonSubexprEliminate {
213229
optimizer_config,
214230
)?;
215231

216-
Ok(LogicalPlan::Sort(Sort {
232+
Ok(Some(LogicalPlan::Sort(Sort {
217233
expr: pop_expr(&mut new_expr)?,
218234
input: Arc::new(new_input),
219235
fetch: *fetch,
220-
}))
236+
})))
221237
}
222238
LogicalPlan::Join(_)
223239
| LogicalPlan::CrossJoin(_)
@@ -243,7 +259,11 @@ impl OptimizerRule for CommonSubexprEliminate {
243259
| LogicalPlan::Extension(_)
244260
| LogicalPlan::Prepare(_) => {
245261
// apply the optimization to all inputs of the plan
246-
utils::optimize_children(self, plan, optimizer_config)
262+
Ok(Some(utils::optimize_children(
263+
self,
264+
plan,
265+
optimizer_config,
266+
)?))
247267
}
248268
}
249269
}

datafusion/optimizer/src/decorrelate_where_exists.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,10 @@ impl DecorrelateWhereExists {
5757
for it in filters.iter() {
5858
match it {
5959
Expr::Exists { subquery, negated } => {
60-
let subquery = self.optimize(&subquery.subquery, optimizer_config)?;
61-
let subquery = Arc::new(subquery);
60+
let subquery = self
61+
.try_optimize(&subquery.subquery, optimizer_config)?
62+
.map(Arc::new)
63+
.unwrap_or_else(|| subquery.subquery.clone());
6264
let subquery = Subquery { subquery };
6365
let subquery = SubqueryInfo::new(subquery.clone(), *negated);
6466
subqueries.push(subquery);
@@ -90,10 +92,12 @@ impl OptimizerRule for DecorrelateWhereExists {
9092
match plan {
9193
LogicalPlan::Filter(filter) => {
9294
let predicate = filter.predicate();
93-
let filter_input = filter.input();
95+
let filter_input = filter.input().as_ref();
9496

9597
// Apply optimizer rule to current input
96-
let optimized_input = self.optimize(filter_input, optimizer_config)?;
98+
let optimized_input = self
99+
.try_optimize(filter_input, optimizer_config)?
100+
.unwrap_or_else(|| filter_input.clone());
97101

98102
let (subqueries, other_exprs) =
99103
self.extract_subquery_exprs(predicate, optimizer_config)?;
@@ -107,7 +111,7 @@ impl OptimizerRule for DecorrelateWhereExists {
107111
}
108112

109113
// iterate through all exists clauses in predicate, turning each into a join
110-
let mut cur_input = (**filter_input).clone();
114+
let mut cur_input = filter_input.clone();
111115
for subquery in subqueries {
112116
if let Some(x) = optimize_exists(&subquery, &cur_input, &other_exprs)?
113117
{

datafusion/optimizer/src/decorrelate_where_in.rs

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,10 @@ impl DecorrelateWhereIn {
5959
subquery,
6060
negated,
6161
} => {
62-
let subquery = self.optimize(&subquery.subquery, optimizer_config)?;
63-
let subquery = Arc::new(subquery);
62+
let subquery = self
63+
.try_optimize(&subquery.subquery, optimizer_config)?
64+
.map(Arc::new)
65+
.unwrap_or_else(|| subquery.subquery.clone());
6466
let subquery = Subquery { subquery };
6567
let subquery =
6668
SubqueryInfo::new(subquery.clone(), (**expr).clone(), *negated);
@@ -81,13 +83,25 @@ impl OptimizerRule for DecorrelateWhereIn {
8183
plan: &LogicalPlan,
8284
optimizer_config: &mut OptimizerConfig,
8385
) -> datafusion_common::Result<LogicalPlan> {
86+
Ok(self
87+
.try_optimize(plan, optimizer_config)?
88+
.unwrap_or_else(|| plan.clone()))
89+
}
90+
91+
fn try_optimize(
92+
&self,
93+
plan: &LogicalPlan,
94+
optimizer_config: &mut OptimizerConfig,
95+
) -> datafusion_common::Result<Option<LogicalPlan>> {
8496
match plan {
8597
LogicalPlan::Filter(filter) => {
8698
let predicate = filter.predicate();
87-
let filter_input = filter.input();
99+
let filter_input = filter.input().as_ref();
88100

89101
// Apply optimizer rule to current input
90-
let optimized_input = self.optimize(filter_input, optimizer_config)?;
102+
let optimized_input = self
103+
.try_optimize(filter_input, optimizer_config)?
104+
.unwrap_or_else(|| filter_input.clone());
91105

92106
let (subqueries, other_exprs) =
93107
self.extract_subquery_exprs(predicate, optimizer_config)?;
@@ -97,11 +111,11 @@ impl OptimizerRule for DecorrelateWhereIn {
97111
)?);
98112
if subqueries.is_empty() {
99113
// regular filter, no subquery exists clause here
100-
return Ok(optimized_plan);
114+
return Ok(Some(optimized_plan));
101115
}
102116

103117
// iterate through all exists clauses in predicate, turning each into a join
104-
let mut cur_input = (**filter_input).clone();
118+
let mut cur_input = filter_input.clone();
105119
for subquery in subqueries {
106120
cur_input = optimize_where_in(
107121
&subquery,
@@ -110,11 +124,15 @@ impl OptimizerRule for DecorrelateWhereIn {
110124
optimizer_config,
111125
)?;
112126
}
113-
Ok(cur_input)
127+
Ok(Some(cur_input))
114128
}
115129
_ => {
116130
// Apply the optimization to all inputs of the plan
117-
utils::optimize_children(self, plan, optimizer_config)
131+
Ok(Some(utils::optimize_children(
132+
self,
133+
plan,
134+
optimizer_config,
135+
)?))
118136
}
119137
}
120138
}

datafusion/optimizer/src/eliminate_cross_join.rs

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,16 @@ impl OptimizerRule for EliminateCrossJoin {
5656
plan: &LogicalPlan,
5757
_optimizer_config: &mut OptimizerConfig,
5858
) -> Result<LogicalPlan> {
59+
Ok(self
60+
.try_optimize(plan, _optimizer_config)?
61+
.unwrap_or_else(|| plan.clone()))
62+
}
63+
64+
fn try_optimize(
65+
&self,
66+
plan: &LogicalPlan,
67+
_optimizer_config: &mut OptimizerConfig,
68+
) -> Result<Option<LogicalPlan>> {
5969
match plan {
6070
LogicalPlan::Filter(filter) => {
6171
let input = (**filter.input()).clone();
@@ -78,7 +88,11 @@ impl OptimizerRule for EliminateCrossJoin {
7888
)?;
7989
}
8090
_ => {
81-
return utils::optimize_children(self, plan, _optimizer_config);
91+
return Ok(Some(utils::optimize_children(
92+
self,
93+
plan,
94+
_optimizer_config,
95+
)?));
8296
}
8397
}
8498

@@ -109,23 +123,26 @@ impl OptimizerRule for EliminateCrossJoin {
109123

110124
// if there are no join keys then do nothing.
111125
if all_join_keys.is_empty() {
112-
Ok(LogicalPlan::Filter(Filter::try_new(
126+
Ok(Some(LogicalPlan::Filter(Filter::try_new(
113127
predicate.clone(),
114128
Arc::new(left),
115-
)?))
129+
)?)))
116130
} else {
117131
// remove join expressions from filter
118132
match remove_join_expressions(predicate, &all_join_keys)? {
119-
Some(filter_expr) => Ok(LogicalPlan::Filter(Filter::try_new(
120-
filter_expr,
121-
Arc::new(left),
122-
)?)),
123-
_ => Ok(left),
133+
Some(filter_expr) => Ok(Some(LogicalPlan::Filter(
134+
Filter::try_new(filter_expr, Arc::new(left))?,
135+
))),
136+
_ => Ok(Some(left)),
124137
}
125138
}
126139
}
127140

128-
_ => utils::optimize_children(self, plan, _optimizer_config),
141+
_ => Ok(Some(utils::optimize_children(
142+
self,
143+
plan,
144+
_optimizer_config,
145+
)?)),
129146
}
130147
}
131148

datafusion/optimizer/src/eliminate_filter.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,16 @@ impl OptimizerRule for EliminateFilter {
4242
plan: &LogicalPlan,
4343
_optimizer_config: &mut OptimizerConfig,
4444
) -> Result<LogicalPlan> {
45+
Ok(self
46+
.try_optimize(plan, _optimizer_config)?
47+
.unwrap_or_else(|| plan.clone()))
48+
}
49+
50+
fn try_optimize(
51+
&self,
52+
plan: &LogicalPlan,
53+
_optimizer_config: &mut OptimizerConfig,
54+
) -> Result<Option<LogicalPlan>> {
4555
let predicate_and_input = match plan {
4656
LogicalPlan::Filter(filter) => match filter.predicate() {
4757
Expr::Literal(ScalarValue::Boolean(Some(v))) => {
@@ -53,14 +63,18 @@ impl OptimizerRule for EliminateFilter {
5363
};
5464

5565
match predicate_and_input {
56-
Some((true, input)) => self.optimize(input, _optimizer_config),
57-
Some((false, input)) => Ok(LogicalPlan::EmptyRelation(EmptyRelation {
66+
Some((true, input)) => self.try_optimize(input, _optimizer_config),
67+
Some((false, input)) => Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
5868
produce_one_row: false,
5969
schema: input.schema().clone(),
60-
})),
70+
}))),
6171
None => {
6272
// Apply the optimization to all inputs of the plan
63-
utils::optimize_children(self, plan, _optimizer_config)
73+
Ok(Some(utils::optimize_children(
74+
self,
75+
plan,
76+
_optimizer_config,
77+
)?))
6478
}
6579
}
6680
}

datafusion/optimizer/src/eliminate_limit.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,25 +42,43 @@ impl OptimizerRule for EliminateLimit {
4242
plan: &LogicalPlan,
4343
optimizer_config: &mut OptimizerConfig,
4444
) -> Result<LogicalPlan> {
45+
Ok(self
46+
.try_optimize(plan, optimizer_config)?
47+
.unwrap_or_else(|| plan.clone()))
48+
}
49+
50+
fn try_optimize(
51+
&self,
52+
plan: &LogicalPlan,
53+
optimizer_config: &mut OptimizerConfig,
54+
) -> Result<Option<LogicalPlan>> {
4555
if let LogicalPlan::Limit(limit) = plan {
4656
match limit.fetch {
4757
Some(fetch) => {
4858
if fetch == 0 {
49-
return Ok(LogicalPlan::EmptyRelation(EmptyRelation {
59+
return Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
5060
produce_one_row: false,
5161
schema: limit.input.schema().clone(),
52-
}));
62+
})));
5363
}
5464
}
5565
None => {
5666
if limit.skip == 0 {
5767
let input = &*limit.input;
58-
return utils::optimize_children(self, input, optimizer_config);
68+
return Ok(Some(utils::optimize_children(
69+
self,
70+
input,
71+
optimizer_config,
72+
)?));
5973
}
6074
}
6175
}
6276
}
63-
utils::optimize_children(self, plan, optimizer_config)
77+
Ok(Some(utils::optimize_children(
78+
self,
79+
plan,
80+
optimizer_config,
81+
)?))
6482
}
6583

6684
fn name(&self) -> &str {

0 commit comments

Comments
 (0)