Skip to content

Commit 4a36aaf

Browse files
committed
Pass filter pushdown through cooperative exec
1 parent d8078f8 commit 4a36aaf

File tree

1 file changed

+24
-0
lines changed
  • datafusion/physical-plan/src

1 file changed

+24
-0
lines changed

datafusion/physical-plan/src/coop.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@
6565
//! The optimizer rule currently checks the plan for exchange-like operators and leave operators
6666
//! that report [`SchedulingType::NonCooperative`] in their [plan properties](ExecutionPlan::properties).
6767
68+
use datafusion_common::config::ConfigOptions;
69+
use datafusion_physical_expr::PhysicalExpr;
6870
#[cfg(datafusion_coop = "tokio_fallback")]
6971
use futures::Future;
7072
use std::any::Any;
@@ -73,6 +75,10 @@ use std::sync::Arc;
7375
use std::task::{Context, Poll};
7476

7577
use crate::execution_plan::CardinalityEffect::{self, Equal};
78+
use crate::filter_pushdown::{
79+
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
80+
FilterPushdownPropagation,
81+
};
7682
use crate::{
7783
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, RecordBatchStream,
7884
SendableRecordBatchStream,
@@ -289,6 +295,24 @@ impl ExecutionPlan for CooperativeExec {
289295
fn cardinality_effect(&self) -> CardinalityEffect {
290296
Equal
291297
}
298+
299+
fn gather_filters_for_pushdown(
300+
&self,
301+
_phase: FilterPushdownPhase,
302+
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
303+
_config: &ConfigOptions,
304+
) -> Result<FilterDescription> {
305+
FilterDescription::from_children(parent_filters, &self.children())
306+
}
307+
308+
fn handle_child_pushdown_result(
309+
&self,
310+
_phase: FilterPushdownPhase,
311+
child_pushdown_result: ChildPushdownResult,
312+
_config: &ConfigOptions,
313+
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
314+
Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
315+
}
292316
}
293317

294318
/// Creates a [`CooperativeStream`] wrapper around the given [`RecordBatchStream`].

0 commit comments

Comments
 (0)