From 8733405ba9972dcf8c65bec5d51f9ebd65982acf Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 9 May 2023 13:18:35 +0100 Subject: [PATCH 1/2] Parallel merge sort (#6162) --- datafusion/core/src/physical_plan/common.rs | 26 ++++++++++++++ .../core/src/physical_plan/sorts/sort.rs | 8 +++-- .../sorts/sort_preserving_merge.rs | 36 +++++-------------- 3 files changed, 40 insertions(+), 30 deletions(-) diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs index 42cd8fada96d..84a7d180a1d8 100644 --- a/datafusion/core/src/physical_plan/common.rs +++ b/datafusion/core/src/physical_plan/common.rs @@ -22,6 +22,7 @@ use crate::error::{DataFusionError, Result}; use crate::execution::context::TaskContext; use crate::execution::memory_pool::MemoryReservation; use crate::physical_plan::metrics::MemTrackingMetrics; +use crate::physical_plan::stream::RecordBatchReceiverStream; use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan, Statistics}; use arrow::datatypes::{Schema, SchemaRef}; use arrow::ipc::writer::{FileWriter, IpcWriteOptions}; @@ -180,6 +181,31 @@ pub(crate) fn spawn_execution( }) } +/// If running in a tokio context spawns the execution of `stream` to a separate task +/// allowing it to execute in parallel with an intermediate buffer of size `buffer` +pub(crate) fn spawn_buffered( + mut input: SendableRecordBatchStream, + buffer: usize, +) -> SendableRecordBatchStream { + // Use tokio only if running from a tokio context (#2201) + let handle = match tokio::runtime::Handle::try_current() { + Ok(handle) => handle, + Err(_) => return input, + }; + + let schema = input.schema(); + let (sender, receiver) = mpsc::channel(buffer); + let join = handle.spawn(async move { + while let Some(item) = input.next().await { + if sender.send(item).await.is_err() { + return; + } + } + }); + + RecordBatchReceiverStream::create(&schema, receiver, join) +} + /// Computes the statistics for an in-memory RecordBatch /// /// Only computes statistics that are in arrows metadata (num rows, byte size and nulls) diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 12018944d4cd..544abb24740d 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -25,7 +25,7 @@ use crate::execution::memory_pool::{ human_readable_size, MemoryConsumer, MemoryReservation, }; use crate::execution::runtime_env::RuntimeEnv; -use crate::physical_plan::common::{batch_byte_size, IPCWriter}; +use crate::physical_plan::common::{batch_byte_size, spawn_buffered, IPCWriter}; use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::metrics::{ BaselineMetrics, CompositeMetricsSet, MemTrackingMetrics, MetricsSet, @@ -284,11 +284,13 @@ impl ExternalSorter { self.partition_id, &self.runtime.memory_pool, ); - sort_batch_stream(batch, self.expr.clone(), self.fetch, metrics) + Ok(spawn_buffered( + sort_batch_stream(batch, self.expr.clone(), self.fetch, metrics)?, + 1, + )) }) .collect::>()?; - // TODO: Run batch sorts concurrently (#6162) // TODO: Pushdown fetch to streaming merge (#6000) streaming_merge( diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index b78058360184..966ebf09da41 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -22,18 +22,17 @@ use std::sync::Arc; use arrow::datatypes::SchemaRef; use log::{debug, trace}; -use tokio::sync::mpsc; use crate::error::{DataFusionError, Result}; use crate::execution::context::TaskContext; +use crate::physical_plan::common::spawn_buffered; use crate::physical_plan::metrics::{ ExecutionPlanMetricsSet, MemTrackingMetrics, MetricsSet, }; use crate::physical_plan::sorts::streaming_merge; -use crate::physical_plan::stream::RecordBatchReceiverStream; use crate::physical_plan::{ - common::spawn_execution, expressions::PhysicalSortExpr, DisplayFormatType, - Distribution, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, + expressions::PhysicalSortExpr, DisplayFormatType, Distribution, ExecutionPlan, + Partitioning, SendableRecordBatchStream, Statistics, }; use datafusion_physical_expr::{EquivalenceProperties, PhysicalSortRequirement}; @@ -181,29 +180,12 @@ impl ExecutionPlan for SortPreservingMergeExec { result } _ => { - // Use tokio only if running from a tokio context (#2201) - let receivers = match tokio::runtime::Handle::try_current() { - Ok(_) => (0..input_partitions) - .map(|part_i| { - let (sender, receiver) = mpsc::channel(1); - let join_handle = spawn_execution( - self.input.clone(), - sender, - part_i, - context.clone(), - ); - - RecordBatchReceiverStream::create( - &schema, - receiver, - join_handle, - ) - }) - .collect(), - Err(_) => (0..input_partitions) - .map(|partition| self.input.execute(partition, context.clone())) - .collect::>()?, - }; + let receivers = (0..input_partitions) + .map(|partition| { + let stream = self.input.execute(partition, context.clone())?; + Ok(spawn_buffered(stream, 1)) + }) + .collect::>()?; debug!("Done setting up sender-receiver for SortPreservingMergeExec::execute"); From bbde12d29a35a6698d1581a8e129bea547aad377 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 9 May 2023 15:33:33 +0100 Subject: [PATCH 2/2] Fix test --- .../core/src/physical_plan/sorts/sort_preserving_merge.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index 966ebf09da41..88916c93f0f4 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -244,6 +244,7 @@ mod tests { use crate::physical_plan::memory::MemoryExec; use crate::physical_plan::metrics::MetricValue; use crate::physical_plan::sorts::sort::SortExec; + use crate::physical_plan::stream::RecordBatchReceiverStream; use crate::physical_plan::{collect, common}; use crate::prelude::{SessionConfig, SessionContext}; use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; @@ -794,7 +795,7 @@ mod tests { let mut streams = Vec::with_capacity(partition_count); for partition in 0..partition_count { - let (sender, receiver) = mpsc::channel(1); + let (sender, receiver) = tokio::sync::mpsc::channel(1); let mut stream = batches.execute(partition, task_ctx.clone()).unwrap(); let join_handle = tokio::spawn(async move { while let Some(batch) = stream.next().await {