Skip to content

Commit 79e84c5

Browse files
adriangbRobert Ream
authored andcommitted
Add ExecutionPlan::reset_state (apache#17028)
* Add ExecutionPlan::reset_state Co-authored-by: Robert Ream <[email protected]> * Update datafusion/sqllogictest/test_files/cte.slt * Add reference * fmt * add to upgrade guide * add explain plan, implement in more plans * fmt * only explain --------- Co-authored-by: Robert Ream <[email protected]>
1 parent 2a1052c commit 79e84c5

File tree

8 files changed

+313
-25
lines changed

8 files changed

+313
-25
lines changed

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ use datafusion_expr::ColumnarValue;
3232
use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash};
3333

3434
/// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference to it.
35+
///
36+
/// Any `ExecutionPlan` that uses this expression and holds a reference to it internally should probably also
37+
/// implement `ExecutionPlan::reset_state` to remain compatible with recursive queries and other situations where
38+
/// the same `ExecutionPlan` is reused with different data.
3539
#[derive(Debug)]
3640
pub struct DynamicFilterPhysicalExpr {
3741
/// The original children of this PhysicalExpr, if any.
@@ -121,6 +125,10 @@ impl DynamicFilterPhysicalExpr {
121125
/// do not change* since those will be used to determine what columns need to read or projected
122126
/// when evaluating the expression.
123127
///
128+
/// Any `ExecutionPlan` that uses this expression and holds a reference to it internally should probably also
129+
/// implement `ExecutionPlan::reset_state` to remain compatible with recursive queries and other situations where
130+
/// the same `ExecutionPlan` is reused with different data.
131+
///
124132
/// [`collect_columns`]: crate::utils::collect_columns
125133
#[allow(dead_code)] // Only used in tests for now
126134
pub fn new(

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,31 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
194194
children: Vec<Arc<dyn ExecutionPlan>>,
195195
) -> Result<Arc<dyn ExecutionPlan>>;
196196

197+
/// Reset any internal state within this [`ExecutionPlan`].
198+
///
199+
/// This method is called when an [`ExecutionPlan`] needs to be re-executed,
200+
/// such as in recursive queries. Unlike [`ExecutionPlan::with_new_children`], this method
201+
/// ensures that any stateful components (e.g., [`DynamicFilterPhysicalExpr`])
202+
/// are reset to their initial state.
203+
///
204+
/// The default implementation simply calls [`ExecutionPlan::with_new_children`] with the existing children,
205+
/// effectively creating a new instance of the [`ExecutionPlan`] with the same children but without
206+
/// necessarily resetting any internal state. Implementations that require resetting of some
207+
/// internal state should override this method to provide the necessary logic.
208+
///
209+
/// This method should *not* reset state recursively for children, as it is expected that
210+
/// it will be called from within a walk of the execution plan tree so that it will be called on each child later
211+
/// or was already called on each child.
212+
///
213+
/// Note to implementers: unlike [`ExecutionPlan::with_new_children`] this method does not accept new children as an argument,
214+
/// thus it is expected that any cached plan properties will remain valid after the reset.
215+
///
216+
/// [`DynamicFilterPhysicalExpr`]: datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr
217+
fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
218+
let children = self.children().into_iter().cloned().collect();
219+
self.with_new_children(children)
220+
}
221+
197222
/// If supported, attempt to increase the partitioning of this `ExecutionPlan` to
198223
/// produce `target_partitions` partitions.
199224
///

datafusion/physical-plan/src/joins/cross_join.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,18 @@ impl ExecutionPlan for CrossJoinExec {
270270
)))
271271
}
272272

273+
fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
274+
let new_exec = CrossJoinExec {
275+
left: Arc::clone(&self.left),
276+
right: Arc::clone(&self.right),
277+
schema: Arc::clone(&self.schema),
278+
left_fut: Default::default(), // reset the build side!
279+
metrics: ExecutionPlanMetricsSet::default(),
280+
cache: self.cache.clone(),
281+
};
282+
Ok(Arc::new(new_exec))
283+
}
284+
273285
fn required_input_distribution(&self) -> Vec<Distribution> {
274286
vec![
275287
Distribution::SinglePartition,

datafusion/physical-plan/src/joins/hash_join.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -774,6 +774,26 @@ impl ExecutionPlan for HashJoinExec {
774774
)?))
775775
}
776776

777+
fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
778+
// Reset the left_fut to allow re-execution
779+
Ok(Arc::new(HashJoinExec {
780+
left: Arc::clone(&self.left),
781+
right: Arc::clone(&self.right),
782+
on: self.on.clone(),
783+
filter: self.filter.clone(),
784+
join_type: self.join_type,
785+
join_schema: Arc::clone(&self.join_schema),
786+
left_fut: OnceAsync::default(),
787+
random_state: self.random_state.clone(),
788+
mode: self.mode,
789+
metrics: ExecutionPlanMetricsSet::new(),
790+
projection: self.projection.clone(),
791+
column_indices: self.column_indices.clone(),
792+
null_equality: self.null_equality,
793+
cache: self.cache.clone(),
794+
}))
795+
}
796+
777797
fn execute(
778798
&self,
779799
partition: usize,

datafusion/physical-plan/src/recursive_query.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ fn assign_work_table(
372372
}
373373

374374
/// Some plans will change their internal states after execution, making them unable to be executed again.
375-
/// This function uses `ExecutionPlan::with_new_children` to fork a new plan with initial states.
375+
/// This function uses [`ExecutionPlan::reset_state`] to reset any internal state within the plan.
376376
///
377377
/// An example is `CrossJoinExec`, which loads the left table into memory and stores it in the plan.
378378
/// However, if the data of the left table is derived from the work table, it will become outdated
@@ -383,8 +383,7 @@ fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPl
383383
if plan.as_any().is::<WorkTableExec>() {
384384
Ok(Transformed::no(plan))
385385
} else {
386-
let new_plan = Arc::clone(&plan)
387-
.with_new_children(plan.children().into_iter().cloned().collect())?;
386+
let new_plan = Arc::clone(&plan).reset_state()?;
388387
Ok(Transformed::yes(new_plan))
389388
}
390389
})

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 58 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -899,6 +899,29 @@ impl SortExec {
899899
self
900900
}
901901

902+
/// Add or reset `self.filter` to a new `DynamicFilterPhysicalExpr`.
903+
fn create_filter(&self) -> Arc<DynamicFilterPhysicalExpr> {
904+
let children = self
905+
.expr
906+
.iter()
907+
.map(|sort_expr| Arc::clone(&sort_expr.expr))
908+
.collect::<Vec<_>>();
909+
Arc::new(DynamicFilterPhysicalExpr::new(children, lit(true)))
910+
}
911+
912+
fn cloned(&self) -> Self {
913+
SortExec {
914+
input: Arc::clone(&self.input),
915+
expr: self.expr.clone(),
916+
metrics_set: self.metrics_set.clone(),
917+
preserve_partitioning: self.preserve_partitioning,
918+
common_sort_prefix: self.common_sort_prefix.clone(),
919+
fetch: self.fetch,
920+
cache: self.cache.clone(),
921+
filter: self.filter.clone(),
922+
}
923+
}
924+
902925
/// Modify how many rows to include in the result
903926
///
904927
/// If None, then all rows will be returned, in sorted order.
@@ -920,25 +943,13 @@ impl SortExec {
920943
}
921944
let filter = fetch.is_some().then(|| {
922945
// If we already have a filter, keep it. Otherwise, create a new one.
923-
self.filter.clone().unwrap_or_else(|| {
924-
let children = self
925-
.expr
926-
.iter()
927-
.map(|sort_expr| Arc::clone(&sort_expr.expr))
928-
.collect::<Vec<_>>();
929-
Arc::new(DynamicFilterPhysicalExpr::new(children, lit(true)))
930-
})
946+
self.filter.clone().unwrap_or_else(|| self.create_filter())
931947
});
932-
SortExec {
933-
input: Arc::clone(&self.input),
934-
expr: self.expr.clone(),
935-
metrics_set: self.metrics_set.clone(),
936-
preserve_partitioning: self.preserve_partitioning,
937-
common_sort_prefix: self.common_sort_prefix.clone(),
938-
fetch,
939-
cache,
940-
filter,
941-
}
948+
let mut new_sort = self.cloned();
949+
new_sort.fetch = fetch;
950+
new_sort.cache = cache;
951+
new_sort.filter = filter;
952+
new_sort
942953
}
943954

944955
/// Input schema
@@ -1110,10 +1121,35 @@ impl ExecutionPlan for SortExec {
11101121
self: Arc<Self>,
11111122
children: Vec<Arc<dyn ExecutionPlan>>,
11121123
) -> Result<Arc<dyn ExecutionPlan>> {
1113-
let mut new_sort = SortExec::new(self.expr.clone(), Arc::clone(&children[0]))
1114-
.with_fetch(self.fetch)
1115-
.with_preserve_partitioning(self.preserve_partitioning);
1116-
new_sort.filter = self.filter.clone();
1124+
let mut new_sort = self.cloned();
1125+
assert!(
1126+
children.len() == 1,
1127+
"SortExec should have exactly one child"
1128+
);
1129+
new_sort.input = Arc::clone(&children[0]);
1130+
// Recompute the properties based on the new input since they may have changed
1131+
let (cache, sort_prefix) = Self::compute_properties(
1132+
&new_sort.input,
1133+
new_sort.expr.clone(),
1134+
new_sort.preserve_partitioning,
1135+
)?;
1136+
new_sort.cache = cache;
1137+
new_sort.common_sort_prefix = sort_prefix;
1138+
1139+
Ok(Arc::new(new_sort))
1140+
}
1141+
1142+
fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
1143+
let children = self.children().into_iter().cloned().collect();
1144+
let new_sort = self.with_new_children(children)?;
1145+
let mut new_sort = new_sort
1146+
.as_any()
1147+
.downcast_ref::<SortExec>()
1148+
.expect("cloned 1 lines above this line, we know the type")
1149+
.clone();
1150+
// Our dynamic filter and execution metrics are the state we need to reset.
1151+
new_sort.filter = Some(new_sort.create_filter());
1152+
new_sort.metrics_set = ExecutionPlanMetricsSet::new();
11171153

11181154
Ok(Arc::new(new_sort))
11191155
}

datafusion/sqllogictest/test_files/cte.slt

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -996,6 +996,61 @@ physical_plan
996996
08)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
997997
09)------------WorkTableExec: name=numbers
998998

999+
# Test for issue #16998: SortExec shares DynamicFilterPhysicalExpr across multiple executions
1000+
query II
1001+
with recursive r as (
1002+
select 0 as k, 0 as v
1003+
union all
1004+
(
1005+
select *
1006+
from r
1007+
order by v
1008+
limit 1
1009+
)
1010+
)
1011+
select *
1012+
from r
1013+
limit 5;
1014+
----
1015+
0 0
1016+
0 0
1017+
0 0
1018+
0 0
1019+
0 0
1020+
1021+
query TT
1022+
explain
1023+
with recursive r as (
1024+
select 0 as k, 0 as v
1025+
union all
1026+
(
1027+
select *
1028+
from r
1029+
order by v
1030+
limit 1
1031+
)
1032+
)
1033+
select *
1034+
from r
1035+
limit 5;
1036+
----
1037+
logical_plan
1038+
01)SubqueryAlias: r
1039+
02)--Limit: skip=0, fetch=5
1040+
03)----RecursiveQuery: is_distinct=false
1041+
04)------Projection: Int64(0) AS k, Int64(0) AS v
1042+
05)--------EmptyRelation
1043+
06)------Sort: r.v ASC NULLS LAST, fetch=1
1044+
07)--------Projection: r.k, r.v
1045+
08)----------TableScan: r
1046+
physical_plan
1047+
01)GlobalLimitExec: skip=0, fetch=5
1048+
02)--RecursiveQueryExec: name=r, is_distinct=false
1049+
03)----ProjectionExec: expr=[0 as k, 0 as v]
1050+
04)------PlaceholderRowExec
1051+
05)----SortExec: TopK(fetch=1), expr=[v@1 ASC NULLS LAST], preserve_partitioning=[false]
1052+
06)------WorkTableExec: name=r
1053+
9991054
statement count 0
10001055
set datafusion.execution.enable_recursive_ctes = false;
10011056

0 commit comments

Comments
 (0)