-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Description
Is your feature request related to a problem or challenge?
The EnforceDistribution physical optimizer pass in DataFusion in some cases will introduce InterleaveExec to increase partitioning when data passes through a UnionExec:
datafusion/datafusion/core/src/physical_optimizer/enforce_distribution.rs
Lines 1196 to 1226 in 2231183
| plan = if plan.as_any().is::<UnionExec>() | |
| && !config.optimizer.prefer_existing_union | |
| && can_interleave(children_plans.iter()) | |
| { | |
| // Add a special case for [`UnionExec`] since we want to "bubble up" | |
| // hash-partitioned data. So instead of | |
| // | |
| // Agg: | |
| // Repartition (hash): | |
| // Union: | |
| // - Agg: | |
| // Repartition (hash): | |
| // Data | |
| // - Agg: | |
| // Repartition (hash): | |
| // Data | |
| // | |
| // we can use: | |
| // | |
| // Agg: | |
| // Interleave: | |
| // - Agg: | |
| // Repartition (hash): | |
| // Data | |
| // - Agg: | |
| // Repartition (hash): | |
| // Data | |
| Arc::new(InterleaveExec::try_new(children_plans)?) | |
| } else { | |
| plan.with_new_children(children_plans)? | |
| }; |
Here is what InterleaveExec does:
datafusion/datafusion/physical-plan/src/union.rs
Lines 286 to 317 in 4edbdd7
| /// Combines multiple input streams by interleaving them. | |
| /// | |
| /// This only works if all inputs have the same hash-partitioning. | |
| /// | |
| /// # Data Flow | |
| /// ```text | |
| /// +---------+ | |
| /// | |---+ | |
| /// | Input 1 | | | |
| /// | |-------------+ | |
| /// +---------+ | | | |
| /// | | +---------+ | |
| /// +------------------>| | | |
| /// +---------------->| Combine |--> | |
| /// | +-------------->| | | |
| /// | | | +---------+ | |
| /// +---------+ | | | | |
| /// | |-----+ | | | |
| /// | Input 2 | | | | |
| /// | |---------------+ | |
| /// +---------+ | | | | |
| /// | | | +---------+ | |
| /// | +-------->| | | |
| /// | +------>| Combine |--> | |
| /// | +---->| | | |
| /// | | +---------+ | |
| /// +---------+ | | | |
| /// | |-------+ | | |
| /// | Input 3 | | | |
| /// | |-----------------+ | |
| /// +---------+ | |
| /// ``` |
However, this has the potential downside of destroying and pre-existing ordering which is sometimes preferable than increasing / improving partitionining (e.g. see #10257 and datafusion.optimizer.prefer_existing_sort setting)
Describe the solution you'd like
I would like there to be some way to preserve the partitioning after a UnionExec without losing the ordering information and then remove the prefer_existing_union flag
Describe alternatives you've considered
One possibility is to add a preserve_order flag to InterleaveExec the same way as RepartitionExec has a preserve_order flag:
datafusion/datafusion/physical-plan/src/repartition/mod.rs
Lines 328 to 417 in 4edbdd7
| /// Maps `N` input partitions to `M` output partitions based on a | |
| /// [`Partitioning`] scheme. | |
| /// | |
| /// # Background | |
| /// | |
| /// DataFusion, like most other commercial systems, with the | |
| /// notable exception of DuckDB, uses the "Exchange Operator" based | |
| /// approach to parallelism which works well in practice given | |
| /// sufficient care in implementation. | |
| /// | |
| /// DataFusion's planner picks the target number of partitions and | |
| /// then `RepartionExec` redistributes [`RecordBatch`]es to that number | |
| /// of output partitions. | |
| /// | |
| /// For example, given `target_partitions=3` (trying to use 3 cores) | |
| /// but scanning an input with 2 partitions, `RepartitionExec` can be | |
| /// used to get 3 even streams of `RecordBatch`es | |
| /// | |
| /// | |
| ///```text | |
| /// ▲ ▲ ▲ | |
| /// │ │ │ | |
| /// │ │ │ | |
| /// │ │ │ | |
| ///┌───────────────┐ ┌───────────────┐ ┌───────────────┐ | |
| ///│ GroupBy │ │ GroupBy │ │ GroupBy │ | |
| ///│ (Partial) │ │ (Partial) │ │ (Partial) │ | |
| ///└───────────────┘ └───────────────┘ └───────────────┘ | |
| /// ▲ ▲ ▲ | |
| /// └──────────────────┼──────────────────┘ | |
| /// │ | |
| /// ┌─────────────────────────┐ | |
| /// │ RepartitionExec │ | |
| /// │ (hash/round robin) │ | |
| /// └─────────────────────────┘ | |
| /// ▲ ▲ | |
| /// ┌───────────┘ └───────────┐ | |
| /// │ │ | |
| /// │ │ | |
| /// .─────────. .─────────. | |
| /// ,─' '─. ,─' '─. | |
| /// ; Input : ; Input : | |
| /// : Partition 0 ; : Partition 1 ; | |
| /// ╲ ╱ ╲ ╱ | |
| /// '─. ,─' '─. ,─' | |
| /// `───────' `───────' | |
| ///``` | |
| /// | |
| /// # Output Ordering | |
| /// | |
| /// If more than one stream is being repartitioned, the output will be some | |
| /// arbitrary interleaving (and thus unordered) unless | |
| /// [`Self::with_preserve_order`] specifies otherwise. | |
| /// | |
| /// # Footnote | |
| /// | |
| /// The "Exchange Operator" was first described in the 1989 paper | |
| /// [Encapsulation of parallelism in the Volcano query processing | |
| /// system | |
| /// Paper](https://w6113.github.io/files/papers/volcanoparallelism-89.pdf) | |
| /// which uses the term "Exchange" for the concept of repartitioning | |
| /// data across threads. | |
| #[derive(Debug)] | |
| pub struct RepartitionExec { | |
| /// Input execution plan | |
| input: Arc<dyn ExecutionPlan>, | |
| /// Partitioning scheme to use | |
| partitioning: Partitioning, | |
| /// Inner state that is initialized when the first output stream is created. | |
| state: LazyState, | |
| /// Execution metrics | |
| metrics: ExecutionPlanMetricsSet, | |
| /// Boolean flag to decide whether to preserve ordering. If true means | |
| /// `SortPreservingRepartitionExec`, false means `RepartitionExec`. | |
| preserve_order: bool, | |
| /// Cache holding plan properties like equivalences, output partitioning etc. | |
| cache: PlanProperties, | |
| } | |
| #[derive(Debug, Clone)] | |
| struct RepartitionMetrics { | |
| /// Time in nanos to execute child operator and fetch batches | |
| fetch_time: metrics::Time, | |
| /// Time in nanos to perform repartitioning | |
| repartition_time: metrics::Time, | |
| /// Time in nanos for sending resulting batches to channels. | |
| /// | |
| /// One metric per output partition. | |
| send_time: Vec<metrics::Time>, | |
| } |
Additional context
We encountered this while working on #10259 @mustafasrepo and @phillipleblanc pointed out that config flag prefer_existing_union was effectively the same as prefer_existing_sort