Skip to content
Merged
113 changes: 54 additions & 59 deletions datafusion/core/tests/dataframe/mod.rs

Large diffs are not rendered by default.

7 changes: 3 additions & 4 deletions datafusion/core/tests/physical_optimizer/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,8 @@ fn pushes_global_limit_exec_through_projection_exec() -> Result<()> {

let expected = [
"ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]",
" GlobalLimitExec: skip=0, fetch=5",
" FilterExec: c3@2 > 0",
" StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true"
" FilterExec: c3@2 > 0, fetch=5",
" StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true"
];
assert_eq!(get_plan_string(&after_optimize), expected);

Expand Down Expand Up @@ -310,7 +309,7 @@ fn keeps_pushed_local_limit_exec_when_there_are_multiple_input_partitions() -> R

let expected = [
"CoalescePartitionsExec: fetch=5",
" FilterExec: c3@2 > 0",
" FilterExec: c3@2 > 0, fetch=5",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true"
];
Expand Down
25 changes: 11 additions & 14 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,19 +766,18 @@ async fn test_physical_plan_display_indent() {

assert_snapshot!(
actual,
@r###"
@r"
SortPreservingMergeExec: [the_min@2 DESC], fetch=10
SortExec: TopK(fetch=10), expr=[the_min@2 DESC], preserve_partitioning=[true]
ProjectionExec: expr=[c1@0 as c1, max(aggregate_test_100.c12)@1 as max(aggregate_test_100.c12), min(aggregate_test_100.c12)@2 as the_min]
AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[max(aggregate_test_100.c12), min(aggregate_test_100.c12)]
CoalesceBatchesExec: target_batch_size=4096
RepartitionExec: partitioning=Hash([c1@0], 9000), input_partitions=9000
AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[max(aggregate_test_100.c12), min(aggregate_test_100.c12)]
CoalesceBatchesExec: target_batch_size=4096
FilterExec: c12@1 < 10
RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1
DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1, c12], file_type=csv, has_header=true
"###
FilterExec: c12@1 < 10
RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1
DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1, c12], file_type=csv, has_header=true
"
);
}

Expand Down Expand Up @@ -1013,16 +1012,14 @@ async fn parquet_recursive_projection_pushdown() -> Result<()> {
RecursiveQueryExec: name=number_series, is_distinct=false
CoalescePartitionsExec
ProjectionExec: expr=[id@0 as id, 1 as level]
CoalesceBatchesExec: target_batch_size=8192
FilterExec: id@0 = 1
RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1
DataSourceExec: file_groups={1 group: [[TMP_DIR/hierarchy.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 = 1, pruning_predicate=id_null_count@2 != row_count@3 AND id_min@0 <= 1 AND 1 <= id_max@1, required_guarantees=[id in (1)]
FilterExec: id@0 = 1
RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1
DataSourceExec: file_groups={1 group: [[TMP_DIR/hierarchy.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 = 1, pruning_predicate=id_null_count@2 != row_count@3 AND id_min@0 <= 1 AND 1 <= id_max@1, required_guarantees=[id in (1)]
CoalescePartitionsExec
ProjectionExec: expr=[id@0 + 1 as ns.id + Int64(1), level@1 + 1 as ns.level + Int64(1)]
CoalesceBatchesExec: target_batch_size=8192
FilterExec: id@0 < 10
RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1
WorkTableExec: name=number_series
FilterExec: id@0 < 10
RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1
WorkTableExec: name=number_series
"
);

Expand Down
5 changes: 2 additions & 3 deletions datafusion/physical-optimizer/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use datafusion_common::error::Result;
use datafusion_common::{config::ConfigOptions, internal_err};
use datafusion_physical_expr::Partitioning;
use datafusion_physical_plan::{
async_func::AsyncFuncExec, coalesce_batches::CoalesceBatchesExec, filter::FilterExec,
async_func::AsyncFuncExec, coalesce_batches::CoalesceBatchesExec,
joins::HashJoinExec, repartition::RepartitionExec, ExecutionPlan,
};

Expand Down Expand Up @@ -60,8 +60,7 @@ impl PhysicalOptimizerRule for CoalesceBatches {
// wrap those ones with a CoalesceBatchesExec operator. An alternate approach here
// would be to build the coalescing logic directly into the operators
// See https://github.com/apache/datafusion/issues/139
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment could probably be removed now (the ticket is long since closed)

let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some()
|| plan_any.downcast_ref::<HashJoinExec>().is_some()
let wrap_in_coalesce = plan_any.downcast_ref::<HashJoinExec>().is_some()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we only have HashJoinExec and RepartitionExec to update before we could remove CoalesceBatches entirely 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - I tried to apply this optimization to RepartitionExec but unfortunately got some regressions (my feeling is due to some decreased parallelism / work distribution).

HashJoinExec would be a great candidate as it might be possible to avoid the take followed by concat which might speed up things.

// Don't need to add CoalesceBatchesExec after a round robin RepartitionExec
|| plan_any
.downcast_ref::<RepartitionExec>()
Expand Down
93 changes: 70 additions & 23 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use super::{
ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use crate::coalesce::LimitedBatchCoalescer;
use crate::coalesce::PushBatchStatus::LimitReached;
use crate::common::can_project;
use crate::execution_plan::CardinalityEffect;
use crate::filter_pushdown::{
Expand All @@ -42,7 +44,7 @@ use crate::{
DisplayFormatType, ExecutionPlan,
};

use arrow::compute::{filter_record_batch, BatchCoalescer};
use arrow::compute::filter_record_batch;
use arrow::datatypes::{DataType, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::cast::as_boolean_array;
Expand Down Expand Up @@ -87,6 +89,8 @@ pub struct FilterExec {
projection: Option<Vec<usize>>,
/// Target batch size for output batches
batch_size: usize,
/// Number of rows to fetch
fetch: Option<usize>,
}

impl FilterExec {
Expand All @@ -112,6 +116,7 @@ impl FilterExec {
cache,
projection: None,
batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE,
fetch: None,
})
}
other => {
Expand Down Expand Up @@ -160,6 +165,7 @@ impl FilterExec {
cache,
projection,
batch_size: self.batch_size,
fetch: self.fetch,
})
}

Expand All @@ -172,6 +178,7 @@ impl FilterExec {
cache: self.cache.clone(),
projection: self.projection.clone(),
batch_size,
fetch: self.fetch,
})
}

Expand Down Expand Up @@ -351,7 +358,14 @@ impl DisplayAs for FilterExec {
} else {
"".to_string()
};
write!(f, "FilterExec: {}{}", self.predicate, display_projections)
let fetch = self
.fetch
.map_or_else(|| "".to_string(), |f| format!(", fetch={f}"));
write!(
f,
"FilterExec: {}{}{}",
self.predicate, display_projections, fetch
)
}
DisplayFormatType::TreeRender => {
write!(f, "predicate={}", fmt_sql(self.predicate.as_ref()))
Expand Down Expand Up @@ -393,7 +407,7 @@ impl ExecutionPlan for FilterExec {
e.with_default_selectivity(selectivity)
})
.and_then(|e| e.with_projection(self.projection().cloned()))
.map(|e| Arc::new(e) as _)
.map(|e| e.with_fetch(self.fetch).unwrap())
}

fn execute(
Expand All @@ -409,8 +423,11 @@ impl ExecutionPlan for FilterExec {
input: self.input.execute(partition, context)?,
metrics,
projection: self.projection.clone(),
batch_coalescer: BatchCoalescer::new(self.schema(), self.batch_size)
.with_biggest_coalesce_batch_size(Some(self.batch_size / 2)),
batch_coalescer: LimitedBatchCoalescer::new(
self.schema(),
self.batch_size,
self.fetch,
),
}))
}

Expand Down Expand Up @@ -569,6 +586,7 @@ impl ExecutionPlan for FilterExec {
)?,
projection: None,
batch_size: self.batch_size,
fetch: self.fetch,
};
Some(Arc::new(new) as _)
};
Expand All @@ -578,6 +596,19 @@ impl ExecutionPlan for FilterExec {
updated_node,
})
}

fn with_fetch(&self, fetch: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
Some(Arc::new(Self {
predicate: Arc::clone(&self.predicate),
input: Arc::clone(&self.input),
metrics: self.metrics.clone(),
default_selectivity: self.default_selectivity,
cache: self.cache.clone(),
projection: self.projection.clone(),
batch_size: self.batch_size,
fetch,
}))
}
}

impl EmbeddedProjection for FilterExec {
Expand Down Expand Up @@ -648,7 +679,7 @@ struct FilterExecStream {
/// The projection indices of the columns in the input schema
projection: Option<Vec<usize>>,
/// Batch coalescer to combine small batches
batch_coalescer: BatchCoalescer,
batch_coalescer: LimitedBatchCoalescer,
}

/// The metrics for `FilterExec`
Expand All @@ -670,6 +701,23 @@ impl FilterExecMetrics {
}
}

impl FilterExecStream {
fn flush_remaining_batches(
&mut self,
) -> Poll<Option<std::result::Result<RecordBatch, DataFusionError>>> {
// Flush any remaining buffered batch
match self.batch_coalescer.finish() {
Ok(()) => {
Poll::Ready(self.batch_coalescer.next_completed_batch().map(|batch| {
self.metrics.selectivity.add_part(batch.num_rows());
Ok(batch)
}))
}
Err(e) => Poll::Ready(Some(Err(e))),
}
}
}

pub fn batch_filter(
batch: &RecordBatch,
predicate: &Arc<dyn PhysicalExpr>,
Expand Down Expand Up @@ -715,7 +763,7 @@ impl Stream for FilterExecStream {
match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
let timer = elapsed_compute.timer();
self.predicate.as_ref()
let status = self.predicate.as_ref()
.evaluate(&batch)
.and_then(|v| v.into_array(batch.num_rows()))
.and_then(|array| {
Expand All @@ -729,11 +777,11 @@ impl Stream for FilterExecStream {
}).and_then(|(array, batch)| {
match as_boolean_array(&array) {
Ok(filter_array) => {
self.metrics.selectivity.add_part(filter_array.true_count());
self.metrics.selectivity.add_total(batch.num_rows());

self.batch_coalescer.push_batch_with_filter(batch.clone(), filter_array)?;
Ok(())
// TODO: support push_batch_with_filter in LimitedBatchCoalescer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a good todo -- it would be good to file as a follow on ticket / PR perhaps

I can do it if you like

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that would be nice

let batch = filter_record_batch(&batch, filter_array)?;
let state = self.batch_coalescer.push_batch(batch)?;
Ok(state)
}
Err(_) => {
internal_err!(
Expand All @@ -742,28 +790,28 @@ impl Stream for FilterExecStream {
}
}
})?;

timer.done();

if self.batch_coalescer.has_completed_batch() {
poll = Poll::Ready(Some(Ok(self
.batch_coalescer
.next_completed_batch()
.expect("has_completed_batch is true"))));
if let LimitReached = status {
poll = self.flush_remaining_batches();
break;
}

if let Some(batch) = self.batch_coalescer.next_completed_batch() {
self.metrics.selectivity.add_part(batch.num_rows());
poll = Poll::Ready(Some(Ok(batch)));
break;
}
continue;
}
None => {
// Flush any remaining buffered batch
match self.batch_coalescer.finish_buffered_batch() {
match self.batch_coalescer.finish() {
Ok(()) => {
poll = Poll::Ready(
self.batch_coalescer.next_completed_batch().map(Ok),
);
poll = self.flush_remaining_batches();
}
Err(e) => {
poll = Poll::Ready(Some(Err(e.into())));
poll = Poll::Ready(Some(Err(e)));
}
}
break;
Expand All @@ -782,7 +830,6 @@ impl Stream for FilterExecStream {
self.input.size_hint()
}
}

impl RecordBatchStream for FilterExecStream {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
Expand Down
20 changes: 9 additions & 11 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -6022,10 +6022,9 @@ physical_plan
07)------------AggregateExec: mode=Final, gby=[c2@0 as c2, c3@1 as c3], aggr=[]
08)--------------CoalescePartitionsExec
09)----------------AggregateExec: mode=Partial, gby=[c2@0 as c2, c3@1 as c3], aggr=[]
10)------------------CoalesceBatchesExec: target_batch_size=8192
11)--------------------FilterExec: c3@1 >= 10 AND c3@1 <= 20
12)----------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
13)------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c2, c3], file_type=csv, has_header=true
10)------------------FilterExec: c3@1 >= 10 AND c3@1 <= 20
11)--------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
12)----------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c2, c3], file_type=csv, has_header=true

query I
SELECT DISTINCT c3 FROM aggregate_test_100 WHERE c3 between 10 and 20 group by c3 order by c3 limit 4;
Expand Down Expand Up @@ -7165,13 +7164,12 @@ logical_plan
03)----Aggregate: groupBy=[[having_test.v1, having_test.v2]], aggr=[[max(having_test.v1)]]
04)------TableScan: having_test projection=[v1, v2]
physical_plan
01)CoalesceBatchesExec: target_batch_size=8192
02)--FilterExec: max(having_test.v1)@2 = 3, projection=[v1@0, v2@1]
03)----AggregateExec: mode=FinalPartitioned, gby=[v1@0 as v1, v2@1 as v2], aggr=[max(having_test.v1)]
04)------CoalesceBatchesExec: target_batch_size=8192
05)--------RepartitionExec: partitioning=Hash([v1@0, v2@1], 4), input_partitions=1
06)----------AggregateExec: mode=Partial, gby=[v1@0 as v1, v2@1 as v2], aggr=[max(having_test.v1)]
07)------------DataSourceExec: partitions=1, partition_sizes=[1]
01)FilterExec: max(having_test.v1)@2 = 3, projection=[v1@0, v2@1]
02)--AggregateExec: mode=FinalPartitioned, gby=[v1@0 as v1, v2@1 as v2], aggr=[max(having_test.v1)]
03)----CoalesceBatchesExec: target_batch_size=8192
04)------RepartitionExec: partitioning=Hash([v1@0, v2@1], 4), input_partitions=1
05)--------AggregateExec: mode=Partial, gby=[v1@0 as v1, v2@1 as v2], aggr=[max(having_test.v1)]
06)----------DataSourceExec: partitions=1, partition_sizes=[1]


query error
Expand Down
Loading