Skip to content

Commit 1a16ba9

Browse files
jackkleemanLiaCastaneda
authored andcommitted
Push dynamic pushdown through CooperativeExec and ProjectionExec (apache#17238)
(cherry picked from commit 4bc0696)
1 parent 06fe687 commit 1a16ba9

File tree

4 files changed

+55
-6
lines changed

4 files changed

+55
-6
lines changed

datafusion/physical-plan/src/coop.rs

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,17 +65,20 @@
6565
//! The optimizer rule currently checks the plan for exchange-like operators and leave operators
6666
//! that report [`SchedulingType::NonCooperative`] in their [plan properties](ExecutionPlan::properties).
6767
68-
#[cfg(any(
69-
datafusion_coop = "tokio_fallback",
70-
not(any(datafusion_coop = "tokio", datafusion_coop = "per_stream"))
71-
))]
68+
use datafusion_common::config::ConfigOptions;
69+
use datafusion_physical_expr::PhysicalExpr;
70+
#[cfg(datafusion_coop = "tokio_fallback")]
7271
use futures::Future;
7372
use std::any::Any;
7473
use std::pin::Pin;
7574
use std::sync::Arc;
7675
use std::task::{Context, Poll};
7776

7877
use crate::execution_plan::CardinalityEffect::{self, Equal};
78+
use crate::filter_pushdown::{
79+
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
80+
FilterPushdownPropagation,
81+
};
7982
use crate::{
8083
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, RecordBatchStream,
8184
SendableRecordBatchStream,
@@ -164,6 +167,8 @@ where
164167
// after the work has been done and just assume that that succeeded.
165168
// The poll result is ignored because we don't want to discard
166169
// or buffer the Ready result we got from the inner stream.
170+
171+
use std::future::Future;
167172
let consume = tokio::task::coop::consume_budget();
168173
let consume_ref = std::pin::pin!(consume);
169174
let _ = consume_ref.poll(cx);
@@ -291,6 +296,24 @@ impl ExecutionPlan for CooperativeExec {
291296
fn cardinality_effect(&self) -> CardinalityEffect {
292297
Equal
293298
}
299+
300+
fn gather_filters_for_pushdown(
301+
&self,
302+
_phase: FilterPushdownPhase,
303+
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
304+
_config: &ConfigOptions,
305+
) -> Result<FilterDescription> {
306+
FilterDescription::from_children(parent_filters, &self.children())
307+
}
308+
309+
fn handle_child_pushdown_result(
310+
&self,
311+
_phase: FilterPushdownPhase,
312+
child_pushdown_result: ChildPushdownResult,
313+
_config: &ConfigOptions,
314+
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
315+
Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
316+
}
294317
}
295318

296319
/// Creates a [`CooperativeStream`] wrapper around the given [`RecordBatchStream`].

datafusion/physical-plan/src/projection.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,16 @@ use super::{
3333
SendableRecordBatchStream, Statistics,
3434
};
3535
use crate::execution_plan::CardinalityEffect;
36+
use crate::filter_pushdown::{
37+
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
38+
FilterPushdownPropagation,
39+
};
3640
use crate::joins::utils::{ColumnIndex, JoinFilter, JoinOn, JoinOnRef};
3741
use crate::{ColumnStatistics, DisplayFormatType, ExecutionPlan, PhysicalExpr};
3842

3943
use arrow::datatypes::{Field, Schema, SchemaRef};
4044
use arrow::record_batch::{RecordBatch, RecordBatchOptions};
45+
use datafusion_common::config::ConfigOptions;
4146
use datafusion_common::stats::Precision;
4247
use datafusion_common::tree_node::{
4348
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
@@ -273,6 +278,27 @@ impl ExecutionPlan for ProjectionExec {
273278
Ok(Some(Arc::new(projection.clone())))
274279
}
275280
}
281+
282+
fn gather_filters_for_pushdown(
283+
&self,
284+
_phase: FilterPushdownPhase,
285+
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
286+
_config: &ConfigOptions,
287+
) -> Result<FilterDescription> {
288+
// TODO: In future, we can try to handle inverting aliases here.
289+
// For the time being, we pass through untransformed filters, so filters on aliases are not handled.
290+
// https://github.com/apache/datafusion/issues/17246
291+
FilterDescription::from_children(parent_filters, &self.children())
292+
}
293+
294+
fn handle_child_pushdown_result(
295+
&self,
296+
_phase: FilterPushdownPhase,
297+
child_pushdown_result: ChildPushdownResult,
298+
_config: &ConfigOptions,
299+
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
300+
Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
301+
}
276302
}
277303

278304
fn stats_projection(

datafusion/sqllogictest/test_files/explain_tree.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ physical_plan
383383
44)-----------------------------│ -------------------- ││ -------------------- │
384384
45)-----------------------------│ files: 1 ││ partition_count(in->out): │
385385
46)-----------------------------│ format: parquet ││ 1 -> 4 │
386-
47)-----------------------------│ ││ │
386+
47)-----------------------------│ predicate: true ││ │
387387
48)-----------------------------│ ││ partitioning_scheme: │
388388
49)-----------------------------│ ││ RoundRobinBatch(4) │
389389
50)-----------------------------└───────────────────────────┘└─────────────┬─────────────┘

datafusion/sqllogictest/test_files/topk.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ explain select number, letter, age, number as column4, letter as column5 from pa
372372
physical_plan
373373
01)SortExec: TopK(fetch=3), expr=[number@0 DESC, letter@1 ASC NULLS LAST, age@2 DESC], preserve_partitioning=[false], sort_prefix=[number@0 DESC, letter@1 ASC NULLS LAST]
374374
02)--ProjectionExec: expr=[number@0 as number, letter@1 as letter, age@2 as age, number@0 as column4, letter@1 as column5]
375-
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet
375+
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ]
376376

377377
# Verify that the sort prefix is correctly computed over normalized, order-maintaining projections (number + 1, number, number + 1, age)
378378
query TT

0 commit comments

Comments
 (0)