Skip to content

Commit 5799f63

Browse files
committed
add parent filter support
1 parent b2db157 commit 5799f63

File tree

3 files changed

+334
-17
lines changed

3 files changed

+334
-17
lines changed

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,255 @@ async fn test_hashjoin_dynamic_filter_pushdown() {
544544
);
545545
}
546546

547+
#[tokio::test]
548+
async fn test_nested_hashjoin_dynamic_filter_pushdown() {
549+
use datafusion_common::JoinType;
550+
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
551+
552+
// Create test data for three tables: t1, t2, t3
553+
// t1: small table with limited values (will be build side of outer join)
554+
let t1_batches =
555+
vec![
556+
record_batch!(("a", Utf8, ["aa", "ab"]), ("x", Float64, [1.0, 2.0])).unwrap(),
557+
];
558+
let t1_schema = Arc::new(Schema::new(vec![
559+
Field::new("a", DataType::Utf8, false),
560+
Field::new("x", DataType::Float64, false),
561+
]));
562+
let t1_scan = TestScanBuilder::new(Arc::clone(&t1_schema))
563+
.with_support(true)
564+
.with_batches(t1_batches)
565+
.build();
566+
567+
// t2: larger table (will be probe side of inner join, build side of outer join)
568+
let t2_batches = vec![record_batch!(
569+
("b", Utf8, ["aa", "ab", "ac", "ad", "ae"]),
570+
("c", Utf8, ["ca", "cb", "cc", "cd", "ce"]),
571+
("y", Float64, [1.0, 2.0, 3.0, 4.0, 5.0])
572+
)
573+
.unwrap()];
574+
let t2_schema = Arc::new(Schema::new(vec![
575+
Field::new("b", DataType::Utf8, false),
576+
Field::new("c", DataType::Utf8, false),
577+
Field::new("y", DataType::Float64, false),
578+
]));
579+
let t2_scan = TestScanBuilder::new(Arc::clone(&t2_schema))
580+
.with_support(true)
581+
.with_batches(t2_batches)
582+
.build();
583+
584+
// t3: largest table (will be probe side of inner join)
585+
let t3_batches = vec![record_batch!(
586+
("d", Utf8, ["ca", "cb", "cc", "cd", "ce", "cf", "cg", "ch"]),
587+
("z", Float64, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0])
588+
)
589+
.unwrap()];
590+
let t3_schema = Arc::new(Schema::new(vec![
591+
Field::new("d", DataType::Utf8, false),
592+
Field::new("z", DataType::Float64, false),
593+
]));
594+
let t3_scan = TestScanBuilder::new(Arc::clone(&t3_schema))
595+
.with_support(true)
596+
.with_batches(t3_batches)
597+
.build();
598+
599+
// Create nested join structure:
600+
// Join (t1.a = t2.b)
601+
// / \
602+
// t1 Join(t2.c = t3.d)
603+
// / \
604+
// t2 t3
605+
606+
// First create inner join: t2.c = t3.d
607+
let inner_join_on =
608+
vec![(col("c", &t2_schema).unwrap(), col("d", &t3_schema).unwrap())];
609+
let inner_join = Arc::new(
610+
HashJoinExec::try_new(
611+
t2_scan,
612+
t3_scan,
613+
inner_join_on,
614+
None,
615+
&JoinType::Inner,
616+
None,
617+
PartitionMode::Partitioned,
618+
datafusion_common::NullEquality::NullEqualsNothing,
619+
)
620+
.unwrap(),
621+
);
622+
623+
// Then create outer join: t1.a = t2.b (from inner join result)
624+
let outer_join_on = vec![(
625+
col("a", &t1_schema).unwrap(),
626+
col("b", &inner_join.schema()).unwrap(),
627+
)];
628+
let outer_join = Arc::new(
629+
HashJoinExec::try_new(
630+
t1_scan,
631+
inner_join as Arc<dyn ExecutionPlan>,
632+
outer_join_on,
633+
None,
634+
&JoinType::Inner,
635+
None,
636+
PartitionMode::Partitioned,
637+
datafusion_common::NullEquality::NullEqualsNothing,
638+
)
639+
.unwrap(),
640+
) as Arc<dyn ExecutionPlan>;
641+
642+
// Test that dynamic filters are pushed down correctly through nested joins
643+
insta::assert_snapshot!(
644+
OptimizationTest::new(Arc::clone(&outer_join), FilterPushdown::new_post_optimization(), true),
645+
@r"
646+
OptimizationTest:
647+
input:
648+
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)]
649+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true
650+
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
651+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true
652+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true
653+
output:
654+
Ok:
655+
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)]
656+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true, predicate=true
657+
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
658+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ]
659+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ]
660+
",
661+
);
662+
663+
// Execute the plan to verify the dynamic filters are properly updated
664+
let mut config = ConfigOptions::default();
665+
config.execution.parquet.pushdown_filters = true;
666+
config.optimizer.enable_dynamic_filter_pushdown = true;
667+
let plan = FilterPushdown::new_post_optimization()
668+
.optimize(outer_join, &config)
669+
.unwrap();
670+
let config = SessionConfig::new().with_batch_size(10);
671+
let session_ctx = SessionContext::new_with_config(config);
672+
session_ctx.register_object_store(
673+
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
674+
Arc::new(InMemory::new()),
675+
);
676+
let state = session_ctx.state();
677+
let task_ctx = state.task_ctx();
678+
let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap();
679+
// Execute to populate the dynamic filters
680+
stream.next().await.unwrap().unwrap();
681+
682+
// Verify that both the inner and outer join have updated dynamic filters
683+
insta::assert_snapshot!(
684+
format!("{}", format_plan_for_test(&plan)),
685+
@r"
686+
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)], filter=[b@0 >= aa AND b@0 <= ab]
687+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true, predicate=true
688+
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)], filter=[d@0 >= ca AND d@0 <= ce]
689+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ b@0 >= aa AND b@0 <= ab ]
690+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ d@0 >= ca AND d@0 <= ce ]
691+
"
692+
);
693+
}
694+
695+
#[tokio::test]
696+
async fn test_hashjoin_parent_filter_pushdown() {
697+
use datafusion_common::JoinType;
698+
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
699+
700+
// Create build side with limited values
701+
let build_batches = vec![record_batch!(
702+
("a", Utf8, ["aa", "ab"]),
703+
("b", Utf8, ["ba", "bb"]),
704+
("c", Float64, [1.0, 2.0])
705+
)
706+
.unwrap()];
707+
let build_side_schema = Arc::new(Schema::new(vec![
708+
Field::new("a", DataType::Utf8, false),
709+
Field::new("b", DataType::Utf8, false),
710+
Field::new("c", DataType::Float64, false),
711+
]));
712+
let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
713+
.with_support(true)
714+
.with_batches(build_batches)
715+
.build();
716+
717+
// Create probe side with more values
718+
let probe_batches = vec![record_batch!(
719+
("d", Utf8, ["aa", "ab", "ac", "ad"]),
720+
("e", Utf8, ["ba", "bb", "bc", "bd"]),
721+
("f", Float64, [1.0, 2.0, 3.0, 4.0])
722+
)
723+
.unwrap()];
724+
let probe_side_schema = Arc::new(Schema::new(vec![
725+
Field::new("d", DataType::Utf8, false),
726+
Field::new("e", DataType::Utf8, false),
727+
Field::new("f", DataType::Float64, false),
728+
]));
729+
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
730+
.with_support(true)
731+
.with_batches(probe_batches)
732+
.build();
733+
734+
// Create HashJoinExec
735+
let on = vec![(
736+
col("a", &build_side_schema).unwrap(),
737+
col("d", &probe_side_schema).unwrap(),
738+
)];
739+
let join = Arc::new(
740+
HashJoinExec::try_new(
741+
build_scan,
742+
probe_scan,
743+
on,
744+
None,
745+
&JoinType::Inner,
746+
None,
747+
PartitionMode::Partitioned,
748+
datafusion_common::NullEquality::NullEqualsNothing,
749+
)
750+
.unwrap(),
751+
);
752+
753+
// Create filters that can be pushed down to different sides
754+
// We need to create filters in the context of the join output schema
755+
let join_schema = join.schema();
756+
757+
// Filter on build side column: a = 'aa'
758+
let left_filter = col_lit_predicate("a", "aa", &join_schema);
759+
// Filter on probe side column: e = 'ba'
760+
let right_filter = col_lit_predicate("e", "ba", &join_schema);
761+
// Filter that references both sides: a = d (should not be pushed down)
762+
let cross_filter = Arc::new(BinaryExpr::new(
763+
col("a", &join_schema).unwrap(),
764+
Operator::Eq,
765+
col("d", &join_schema).unwrap(),
766+
)) as Arc<dyn PhysicalExpr>;
767+
768+
let filter =
769+
Arc::new(FilterExec::try_new(left_filter, Arc::clone(&join) as _).unwrap());
770+
let filter = Arc::new(FilterExec::try_new(right_filter, filter).unwrap());
771+
let plan = Arc::new(FilterExec::try_new(cross_filter, filter).unwrap())
772+
as Arc<dyn ExecutionPlan>;
773+
774+
// Test that filters are pushed down correctly to each side of the join
775+
insta::assert_snapshot!(
776+
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new(), true),
777+
@r"
778+
OptimizationTest:
779+
input:
780+
- FilterExec: a@0 = d@3
781+
- FilterExec: e@4 = ba
782+
- FilterExec: a@0 = aa
783+
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
784+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
785+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true
786+
output:
787+
Ok:
788+
- FilterExec: e@4 = ba AND a@0 = d@3 AND a@0 = aa
789+
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
790+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = aa
791+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=e@1 = ba
792+
"
793+
);
794+
}
795+
547796
/// Integration test for dynamic filter pushdown with TopK.
548797
/// We use an integration test because there are complex interactions in the optimizer rules
549798
/// that the unit tests applying a single optimizer rule do not cover.

datafusion/physical-plan/src/filter_pushdown.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,18 @@ impl FilterDescription {
353353
}
354354
}
355355

356+
pub fn with_child_pushdown(
357+
mut self,
358+
parent_filters: PredicateSupports,
359+
self_filters: Vec<Arc<dyn PhysicalExpr>>,
360+
) -> Self {
361+
self.child_filter_descriptions.push(ChildFilterDescription {
362+
parent_filters,
363+
self_filters,
364+
});
365+
self
366+
}
367+
356368
pub fn parent_filters(&self) -> Vec<PredicateSupports> {
357369
self.child_filter_descriptions
358370
.iter()

datafusion/physical-plan/src/joins/hash_join.rs

Lines changed: 73 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ use super::{
3434
};
3535
use super::{JoinOn, JoinOnRef};
3636
use crate::execution_plan::{boundedness_from_children, EmissionType};
37-
use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase};
37+
use crate::filter_pushdown::{
38+
FilterDescription, FilterPushdownPhase, PredicateSupport, PredicateSupports,
39+
};
3840
use crate::projection::{
3941
try_embed_projection, try_pushdown_through_join, EmbeddedProjection, JoinData,
4042
ProjectionExec,
@@ -71,7 +73,7 @@ use arrow::util::bit_util;
7173
use datafusion_common::utils::memory::estimate_memory_size;
7274
use datafusion_common::{
7375
internal_datafusion_err, internal_err, plan_err, project_schema, DataFusionError,
74-
JoinSide, JoinType, NullEquality, Result, ScalarValue,
76+
HashSet, JoinSide, JoinType, NullEquality, Result, ScalarValue,
7577
};
7678
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
7779
use datafusion_execution::TaskContext;
@@ -80,6 +82,7 @@ use datafusion_physical_expr::equivalence::{
8082
join_equivalence_properties, ProjectionMapping,
8183
};
8284
use datafusion_physical_expr::expressions::{lit, BinaryExpr, DynamicFilterPhysicalExpr};
85+
use datafusion_physical_expr::utils::{collect_columns, reassign_predicate_columns};
8386
use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
8487
use datafusion_physical_expr_common::datum::compare_op_for_nested;
8588

@@ -985,26 +988,79 @@ impl ExecutionPlan for HashJoinExec {
985988
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
986989
config: &datafusion_common::config::ConfigOptions,
987990
) -> Result<FilterDescription> {
988-
// Don't allow parent filters to be pushed down for now
989-
// Only add our dynamic filter during the Post phase
990-
if !matches!(phase, FilterPushdownPhase::Post) {
991-
return Ok(FilterDescription::new_with_child_count(2)
992-
.all_parent_filters_unsupported(parent_filters));
991+
// Analyze parent filters to see which can be pushed down to which side
992+
let left_schema = self.left.schema();
993+
let left_column_names = left_schema
994+
.fields()
995+
.iter()
996+
.map(|f| f.name().as_str())
997+
.collect::<HashSet<_>>();
998+
let right_schema = self.right.schema();
999+
let right_column_names = right_schema
1000+
.fields()
1001+
.iter()
1002+
.map(|f| f.name().as_str())
1003+
.collect::<HashSet<_>>();
1004+
1005+
let mut left_filters = Vec::new();
1006+
let mut right_filters = Vec::new();
1007+
1008+
for filter in &parent_filters {
1009+
// Check which columns the filter references
1010+
let referenced_columns = collect_columns(filter);
1011+
1012+
// Categorize columns in the filter by which side they belong to
1013+
let references_left_columns = referenced_columns
1014+
.iter()
1015+
.any(|col| left_column_names.contains(col.name()));
1016+
let references_right_columns = referenced_columns
1017+
.iter()
1018+
.any(|col| right_column_names.contains(col.name()));
1019+
1020+
if references_left_columns && references_right_columns {
1021+
// Filter references both sides - cannot push down, skip it
1022+
left_filters.push(PredicateSupport::Unsupported(Arc::clone(filter)));
1023+
right_filters.push(PredicateSupport::Unsupported(Arc::clone(filter)));
1024+
continue;
1025+
} else if references_left_columns {
1026+
// Filter only references left side - push to left
1027+
left_filters.push(PredicateSupport::Supported(
1028+
reassign_predicate_columns(Arc::clone(filter), &left_schema, false)?,
1029+
));
1030+
right_filters.push(PredicateSupport::Unsupported(Arc::clone(filter)));
1031+
} else if references_right_columns {
1032+
// Filter only references right side - push to right
1033+
// Need to adjust column indices for right side
1034+
left_filters.push(PredicateSupport::Unsupported(Arc::clone(filter)));
1035+
right_filters.push(PredicateSupport::Supported(
1036+
reassign_predicate_columns(Arc::clone(filter), &right_schema, false)?,
1037+
));
1038+
} else {
1039+
// Filter doesn't reference any columns from either side (e.g., constant)
1040+
// Push to both sides as it's still valid
1041+
left_filters.push(PredicateSupport::Supported(Arc::clone(filter)));
1042+
right_filters.push(PredicateSupport::Supported(Arc::clone(filter)));
1043+
}
9931044
}
9941045

995-
// Only push down dynamic filters if enabled
996-
if config.optimizer.enable_dynamic_filter_pushdown {
997-
let filter = Arc::clone(&self.dynamic_filter) as Arc<dyn PhysicalExpr>;
1046+
let mut right_self_filters = Vec::new();
1047+
if matches!(phase, FilterPushdownPhase::Post)
1048+
&& config.optimizer.enable_dynamic_filter_pushdown
1049+
{
1050+
let dynamic_filter =
1051+
Arc::clone(&self.dynamic_filter) as Arc<dyn PhysicalExpr>;
9981052
// Push the dynamic filter to the right side (probe side) only
999-
// Left side (build side) gets empty vec, right side gets the filter
1000-
let filters_for_children = vec![vec![], vec![filter]];
1001-
return Ok(FilterDescription::new_with_child_count(2)
1002-
.all_parent_filters_unsupported(parent_filters)
1003-
.with_self_filters_for_children(filters_for_children));
1053+
right_self_filters.push(dynamic_filter);
10041054
}
10051055

1006-
Ok(FilterDescription::new_with_child_count(2)
1007-
.all_parent_filters_unsupported(parent_filters))
1056+
let res = FilterDescription::new_with_child_count(0)
1057+
.with_child_pushdown(PredicateSupports::new(left_filters), vec![])
1058+
.with_child_pushdown(
1059+
PredicateSupports::new(right_filters),
1060+
right_self_filters,
1061+
);
1062+
1063+
Ok(res)
10081064
}
10091065
}
10101066

0 commit comments

Comments
 (0)