Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

370 changes: 365 additions & 5 deletions datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,8 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() {
@r"
- SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false]
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ] AND DynamicFilterPhysicalExpr [ true ]
"
);

Expand All @@ -264,9 +264,9 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() {
format_plan_for_test(&plan),
@r"
- SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false], filter=[e@4 IS NULL OR e@4 < bb]
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ e@1 IS NULL OR e@1 < bb ]
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)], filter=[d@0 >= aa AND d@0 <= ab]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ d@0 >= aa AND d@0 <= ab ] AND DynamicFilterPhysicalExpr [ e@1 IS NULL OR e@1 < bb ]
"
);
}
Expand Down Expand Up @@ -734,6 +734,366 @@ async fn test_topk_dynamic_filter_pushdown() {
);
}

#[tokio::test]
async fn test_hashjoin_dynamic_filter_pushdown() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some tests for multiple joins? Such as

Join (t1.a = t2.b)
/        \
t1    Join(t2.c = t3.d)
        /    \
       t3   t2

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Such test can check

  1. dynamic filters are pushed down to right scan node
  2. dynamic filters aren't missed during pushdown

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a test that I think matches your suggestion

use datafusion_common::JoinType;
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};

// Create build side with limited values
let build_batches = vec![record_batch!(
("a", Utf8, ["aa", "ab"]),
("b", Utf8, ["ba", "bb"]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may add some Utf8View fields testing cases, because our default mapping already changing to Utf8View.

Copy link
Contributor Author

@adriangb adriangb Aug 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

("c", Float64, [1.0, 2.0]) // Extra column not used in join
)
.unwrap()];
let build_side_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
Field::new("c", DataType::Float64, false),
]));
let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
.with_support(true)
.with_batches(build_batches)
.build();

// Create probe side with more values
let probe_batches = vec![record_batch!(
("a", Utf8, ["aa", "ab", "ac", "ad"]),
("b", Utf8, ["ba", "bb", "bc", "bd"]),
("e", Float64, [1.0, 2.0, 3.0, 4.0]) // Extra column not used in join
)
.unwrap()];
let probe_side_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
Field::new("e", DataType::Float64, false),
]));
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
.with_support(true)
.with_batches(probe_batches)
.build();

// Create HashJoinExec with dynamic filter
let on = vec![
(
col("a", &build_side_schema).unwrap(),
col("a", &probe_side_schema).unwrap(),
),
(
col("b", &build_side_schema).unwrap(),
col("b", &probe_side_schema).unwrap(),
),
];
let plan = Arc::new(
HashJoinExec::try_new(
build_scan,
probe_scan,
on,
None,
&JoinType::Inner,
None,
PartitionMode::Partitioned,
datafusion_common::NullEquality::NullEqualsNothing,
)
.unwrap(),
) as Arc<dyn ExecutionPlan>;

// expect the predicate to be pushed down into the probe side DataSource
insta::assert_snapshot!(
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true),
@r"
OptimizationTest:
input:
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true
output:
Ok:
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ]
",
);

// Actually apply the optimization to the plan and execute to see the filter in action
let mut config = ConfigOptions::default();
config.execution.parquet.pushdown_filters = true;
config.optimizer.enable_dynamic_filter_pushdown = true;
let plan = FilterPushdown::new_post_optimization()
.optimize(plan, &config)
.unwrap();
let config = SessionConfig::new().with_batch_size(10);
let session_ctx = SessionContext::new_with_config(config);
session_ctx.register_object_store(
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
Arc::new(InMemory::new()),
);
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap();
// Iterate one batch
stream.next().await.unwrap().unwrap();

// Now check what our filter looks like
insta::assert_snapshot!(
format!("{}", format_plan_for_test(&plan)),
@r"
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)], filter=[a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ]
"
);
}

#[tokio::test]
async fn test_nested_hashjoin_dynamic_filter_pushdown() {
use datafusion_common::JoinType;
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};

// Create test data for three tables: t1, t2, t3
// t1: small table with limited values (will be build side of outer join)
let t1_batches =
vec![
record_batch!(("a", Utf8, ["aa", "ab"]), ("x", Float64, [1.0, 2.0])).unwrap(),
];
let t1_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("x", DataType::Float64, false),
]));
let t1_scan = TestScanBuilder::new(Arc::clone(&t1_schema))
.with_support(true)
.with_batches(t1_batches)
.build();

// t2: larger table (will be probe side of inner join, build side of outer join)
let t2_batches = vec![record_batch!(
("b", Utf8, ["aa", "ab", "ac", "ad", "ae"]),
("c", Utf8, ["ca", "cb", "cc", "cd", "ce"]),
("y", Float64, [1.0, 2.0, 3.0, 4.0, 5.0])
)
.unwrap()];
let t2_schema = Arc::new(Schema::new(vec![
Field::new("b", DataType::Utf8, false),
Field::new("c", DataType::Utf8, false),
Field::new("y", DataType::Float64, false),
]));
let t2_scan = TestScanBuilder::new(Arc::clone(&t2_schema))
.with_support(true)
.with_batches(t2_batches)
.build();

// t3: largest table (will be probe side of inner join)
let t3_batches = vec![record_batch!(
("d", Utf8, ["ca", "cb", "cc", "cd", "ce", "cf", "cg", "ch"]),
("z", Float64, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0])
)
.unwrap()];
let t3_schema = Arc::new(Schema::new(vec![
Field::new("d", DataType::Utf8, false),
Field::new("z", DataType::Float64, false),
]));
let t3_scan = TestScanBuilder::new(Arc::clone(&t3_schema))
.with_support(true)
.with_batches(t3_batches)
.build();

// Create nested join structure:
// Join (t1.a = t2.b)
// / \
// t1 Join(t2.c = t3.d)
// / \
// t2 t3

// First create inner join: t2.c = t3.d
let inner_join_on =
vec![(col("c", &t2_schema).unwrap(), col("d", &t3_schema).unwrap())];
let inner_join = Arc::new(
HashJoinExec::try_new(
t2_scan,
t3_scan,
inner_join_on,
None,
&JoinType::Inner,
None,
PartitionMode::Partitioned,
datafusion_common::NullEquality::NullEqualsNothing,
)
.unwrap(),
);

// Then create outer join: t1.a = t2.b (from inner join result)
let outer_join_on = vec![(
col("a", &t1_schema).unwrap(),
col("b", &inner_join.schema()).unwrap(),
)];
let outer_join = Arc::new(
HashJoinExec::try_new(
t1_scan,
inner_join as Arc<dyn ExecutionPlan>,
outer_join_on,
None,
&JoinType::Inner,
None,
PartitionMode::Partitioned,
datafusion_common::NullEquality::NullEqualsNothing,
)
.unwrap(),
) as Arc<dyn ExecutionPlan>;

// Test that dynamic filters are pushed down correctly through nested joins
insta::assert_snapshot!(
OptimizationTest::new(Arc::clone(&outer_join), FilterPushdown::new_post_optimization(), true),
@r"
OptimizationTest:
input:
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true
output:
Ok:
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true, predicate=true
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=true AND DynamicFilterPhysicalExpr [ true ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ]
",
);

// Execute the plan to verify the dynamic filters are properly updated
let mut config = ConfigOptions::default();
config.execution.parquet.pushdown_filters = true;
config.optimizer.enable_dynamic_filter_pushdown = true;
let plan = FilterPushdown::new_post_optimization()
.optimize(outer_join, &config)
.unwrap();
let config = SessionConfig::new().with_batch_size(10);
let session_ctx = SessionContext::new_with_config(config);
session_ctx.register_object_store(
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
Arc::new(InMemory::new()),
);
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap();
// Execute to populate the dynamic filters
stream.next().await.unwrap().unwrap();

// Verify that both the inner and outer join have updated dynamic filters
insta::assert_snapshot!(
format!("{}", format_plan_for_test(&plan)),
@r"
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)], filter=[b@0 >= aa AND b@0 <= ab]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true, predicate=true
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)], filter=[d@0 >= ca AND d@0 <= ce]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=true AND DynamicFilterPhysicalExpr [ b@0 >= aa AND b@0 <= ab ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ d@0 >= ca AND d@0 <= ce ]
"
);
}

#[tokio::test]
async fn test_hashjoin_parent_filter_pushdown() {
use datafusion_common::JoinType;
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};

// Create build side with limited values
let build_batches = vec![record_batch!(
("a", Utf8, ["aa", "ab"]),
("b", Utf8, ["ba", "bb"]),
("c", Float64, [1.0, 2.0])
)
.unwrap()];
let build_side_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
Field::new("c", DataType::Float64, false),
]));
let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
.with_support(true)
.with_batches(build_batches)
.build();

// Create probe side with more values
let probe_batches = vec![record_batch!(
("d", Utf8, ["aa", "ab", "ac", "ad"]),
("e", Utf8, ["ba", "bb", "bc", "bd"]),
("f", Float64, [1.0, 2.0, 3.0, 4.0])
)
.unwrap()];
let probe_side_schema = Arc::new(Schema::new(vec![
Field::new("d", DataType::Utf8, false),
Field::new("e", DataType::Utf8, false),
Field::new("f", DataType::Float64, false),
]));
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
.with_support(true)
.with_batches(probe_batches)
.build();

// Create HashJoinExec
let on = vec![(
col("a", &build_side_schema).unwrap(),
col("d", &probe_side_schema).unwrap(),
)];
let join = Arc::new(
HashJoinExec::try_new(
build_scan,
probe_scan,
on,
None,
&JoinType::Inner,
None,
PartitionMode::Partitioned,
datafusion_common::NullEquality::NullEqualsNothing,
)
.unwrap(),
);

// Create filters that can be pushed down to different sides
// We need to create filters in the context of the join output schema
let join_schema = join.schema();

// Filter on build side column: a = 'aa'
let left_filter = col_lit_predicate("a", "aa", &join_schema);
// Filter on probe side column: e = 'ba'
let right_filter = col_lit_predicate("e", "ba", &join_schema);
// Filter that references both sides: a = d (should not be pushed down)
let cross_filter = Arc::new(BinaryExpr::new(
col("a", &join_schema).unwrap(),
Operator::Eq,
col("d", &join_schema).unwrap(),
)) as Arc<dyn PhysicalExpr>;

let filter =
Arc::new(FilterExec::try_new(left_filter, Arc::clone(&join) as _).unwrap());
let filter = Arc::new(FilterExec::try_new(right_filter, filter).unwrap());
let plan = Arc::new(FilterExec::try_new(cross_filter, filter).unwrap())
as Arc<dyn ExecutionPlan>;

// Test that filters are pushed down correctly to each side of the join
insta::assert_snapshot!(
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new(), true),
@r"
OptimizationTest:
input:
- FilterExec: a@0 = d@3
- FilterExec: e@4 = ba
- FilterExec: a@0 = aa
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true
output:
Ok:
- FilterExec: a@0 = d@3
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = aa
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=e@1 = ba
"
);
}

/// Integration test for dynamic filter pushdown with TopK.
/// We use an integration test because there are complex interactions in the optimizer rules
/// that the unit tests applying a single optimizer rule do not cover.
Expand Down
Loading