Skip to content
257 changes: 229 additions & 28 deletions datafusion/core/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,28 +34,30 @@ use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStrea
use crate::physical_plan::sorts::SortedStream;
use crate::physical_plan::stream::RecordBatchReceiverStream;
use crate::physical_plan::{
common, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan,
Partitioning, SendableRecordBatchStream, Statistics,
DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan, Partitioning,
RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use crate::prelude::SessionConfig;
use arrow::array::ArrayRef;
use arrow::array::{make_array, Array, ArrayRef, MutableArrayData, UInt32Array};
pub use arrow::compute::SortOptions;
use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions};
use arrow::compute::{concat, lexsort_to_indices, take, SortColumn, TakeOptions};
use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
use arrow::ipc::reader::FileReader;
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use futures::lock::Mutex;
use futures::StreamExt;
use futures::{Stream, StreamExt};
use log::{debug, error};
use std::any::Any;
use std::cmp::min;
use std::fmt;
use std::fmt::{Debug, Formatter};
use std::fs::File;
use std::io::BufReader;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::task::{Context, Poll};
use tempfile::NamedTempFile;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::task;
Expand Down Expand Up @@ -105,13 +107,21 @@ impl ExternalSorter {
}
}

async fn insert_batch(&self, input: RecordBatch) -> Result<()> {
async fn insert_batch(
&self,
input: RecordBatch,
tracking_metrics: &MemTrackingMetrics,
) -> Result<()> {
if input.num_rows() > 0 {
let size = batch_byte_size(&input);
self.try_grow(size).await?;
self.metrics.mem_used().add(size);
let mut in_mem_batches = self.in_mem_batches.lock().await;
in_mem_batches.push(input);
// NB timer records time taken on drop, so there are no
// calls to `timer.done()` below.
let _timer = tracking_metrics.elapsed_compute().timer();
let partial = sort_batch(input, self.schema.clone(), &self.expr)?;
Copy link
Member Author

@yjshen yjshen Apr 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A change here: sort each batch before buffering it in memory.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Performance would deteriorate significantly without this change:

Running benchmarks with the following options: DataFusionBenchmarkOpt { query: 1, debug: false, iterations: 3, partitions: 2, batch_size: 4096, path: "/home/yijie/sort_test/tpch-parquet", file_format: "parquet", mem_table: false, output_path: None }
Query 1 iteration 0 took 4619.9 ms and returned 6001214 rows
Query 1 iteration 1 took 4561.0 ms and returned 6001214 rows
Query 1 iteration 2 took 4527.7 ms and returned 6001214 rows

The main reason I think is caused by random memory access while constructing output batches. Without this per-batch sort, while collecting cells from unsorted batches, the memory access would be fully randomized. With this per-batch sort, we are accessing memory linearly for each column in each batch, this would results in much predictable memory access pattern and benefits the CPU cache.

I think the perf counter confirms the above speculation:

sudo perf stat -a -e cache-misses,cache-references,l3_cache_accesses,l3_misses,dTLB-load-misses,dTLB-loads target/release/tpch benchmark datafusion --iterations 3 --path /home/yijie/sort_test/tpch-parquet --format parquet --query 1 --batch-size 4096

Without this per-batch sort:

Performance counter stats for 'system wide':

     1,340,359,889      cache-misses              #   35.817 % of all cache refs  
     3,742,289,458      cache-references                                          
     1,984,089,839      l3_cache_accesses                                         
       540,429,658      l3_misses                                                 
       303,508,234      dTLB-load-misses          #   49.51% of all dTLB cache accesses
       613,048,439      dTLB-loads                                                

      14.222309739 seconds time elapsed

With this per-batch sort:

 Performance counter stats for 'system wide':

     1,059,913,512      cache-misses              #   30.715 % of all cache refs  
     3,450,839,405      cache-references                                          
     1,388,975,765      l3_cache_accesses                                         
       235,570,805      l3_misses                                                 
       239,390,511      dTLB-load-misses          #   51.36% of all dTLB cache accesses
       466,141,655      dTLB-loads                                                

       8.675278258 seconds time elapsed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense that sorting a batch in the same thread that produced it (and thus would still be in the cache) improves performance. Nice find @yjshen

cc @tustvold who has been observing similar things while working on scheduling I/O and CPU decoding

Copy link
Member Author

@yjshen yjshen Apr 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think the performance gains come from two folds:

  • Sort and reorder the batch in the same thread while it would still be in the cache, as you mentioned.
  • I think the other one is the memory access pattern for the final output phase. We are serially accessing columns for each batch. So the "sort order materializing we done for each incoming column" changes "purely" randomized collecting to sequentially accessing each column from all the batches, and yields a better cache behavior.

in_mem_batches.push(partial);
}
Ok(())
}
Expand All @@ -124,6 +134,7 @@ impl ExternalSorter {
/// MergeSort in mem batches as well as spills into total order with `SortPreservingMergeStream`.
async fn sort(&self) -> Result<SendableRecordBatchStream> {
let partition = self.partition_id();
let batch_size = self.session_config.batch_size;
let mut in_mem_batches = self.in_mem_batches.lock().await;

if self.spilled_before().await {
Expand All @@ -136,6 +147,7 @@ impl ExternalSorter {
&mut *in_mem_batches,
self.schema.clone(),
&self.expr,
batch_size,
tracking_metrics,
)?;
let prev_used = self.metrics.mem_used().set(0);
Expand Down Expand Up @@ -166,6 +178,7 @@ impl ExternalSorter {
&mut *in_mem_batches,
self.schema.clone(),
&self.expr,
batch_size,
tracking_metrics,
);
// Report to the memory manager we are no longer using memory
Expand Down Expand Up @@ -200,6 +213,12 @@ impl Debug for ExternalSorter {
}
}

#[derive(Debug, Copy, Clone)]
struct CombinedIndex {
batch_idx: usize,
row_idx: usize,
}

impl Drop for ExternalSorter {
fn drop(&mut self) {
self.runtime.drop_consumer(self.id(), self.used());
Expand Down Expand Up @@ -249,6 +268,7 @@ impl MemoryConsumer for ExternalSorter {
&mut *in_mem_batches,
self.schema.clone(),
&*self.expr,
self.session_config.batch_size,
tracking_metrics,
);

Expand All @@ -271,33 +291,212 @@ fn in_mem_partial_sort(
buffered_batches: &mut Vec<RecordBatch>,
schema: SchemaRef,
expressions: &[PhysicalSortExpr],
batch_size: usize,
tracking_metrics: MemTrackingMetrics,
) -> Result<SendableRecordBatchStream> {
assert_ne!(buffered_batches.len(), 0);
if buffered_batches.len() == 1 {
let result = buffered_batches.pop();
Ok(Box::pin(SizedRecordBatchStream::new(
schema,
vec![Arc::new(result.unwrap())],
tracking_metrics,
)))
} else {
let batches = buffered_batches.drain(..).collect::<Vec<_>>();
let sorted_iter = {
// NB timer records time taken on drop, so there are no
// calls to `timer.done()` below.
let _timer = tracking_metrics.elapsed_compute().timer();
get_sorted_iter(&batches, expressions, batch_size)?
};
Ok(Box::pin(SortedSizedRecordBatchStream::new(
schema,
batches,
sorted_iter,
tracking_metrics,
)))
}
}

let result = {
// NB timer records time taken on drop, so there are no
// calls to `timer.done()` below.
let _timer = tracking_metrics.elapsed_compute().timer();
fn get_sorted_iter(
Copy link
Member Author

@yjshen yjshen Apr 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main changes:

  1. concat all sort-columns (instead of all columns)
  2. sort to get the index array (same as the original sort)
  3. use CompositeIndex to avoid huge batch construction (to access records scattered in different batches)
  4. construct a small batch at a time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What in the extreme case of when the number of sort columns is equivalent or close to the nr columns?
In that case we are a bit worse of now than before because we need to concat the sort columns anyway?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in both cases (for this PR and the current master), we need to concat sort columns before the current lexsort. For the current master, the concat is done while constructing the single huge record batch.

For select a, b from table order by a,b, we consume memory with Vec<CompositeIndex> in this PR, but also avoid take huge arrays that do the actual reorder. So I think this behavior is consistent in this PR for different cases that sort columns and payload columns vary?

Copy link
Contributor

@alamb alamb Apr 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say that the peak memory usage is no different in the worst case (all columns are being sorted) as the implementation on master copies the entire input into a new record batch with all columns as well as evaluate the SortExprs into their own area

However, in the common case where not all columns are part of the sort key, this implementation will use significantly less peak memory

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah my intuition is that worst case memory usage will be similar. My conern was that we use mutablearraydata in the new implementation, but I realize we only use them for the non-sort columns. So the implementation in the above case (sort by all columns) is almost the same.

batches: &[RecordBatch],
expr: &[PhysicalSortExpr],
batch_size: usize,
) -> Result<SortedIterator> {
let (batch_grouped, combined): (Vec<Vec<ArrayRef>>, Vec<Vec<CombinedIndex>>) =
batches
.iter()
.enumerate()
.map(|(i, batch)| {
let col: Vec<ArrayRef> = expr
.iter()
.map(|e| Ok(e.evaluate_to_sort_column(batch)?.values))
.collect::<Result<Vec<_>>>()?;

let combined_index = (0..batch.num_rows())
.map(|r| CombinedIndex {
batch_idx: i,
row_idx: r,
})
.collect::<Vec<CombinedIndex>>();

Ok((col, combined_index))
})
.collect::<Result<Vec<(Vec<ArrayRef>, Vec<CombinedIndex>)>>>()?
.into_iter()
.unzip();

let column_grouped = transpose(batch_grouped);

let sort_columns: Vec<SortColumn> = column_grouped
.iter()
.zip(expr.iter())
.map(|(c, e)| {
Ok(SortColumn {
values: concat(
&*c.iter().map(|i| i.as_ref()).collect::<Vec<&dyn Array>>(),
)?,
options: Some(e.options),
})
})
.collect::<Vec<Result<SortColumn>>>()
.into_iter()
.collect::<Result<Vec<_>>>()?;

let indices = lexsort_to_indices(&sort_columns, None)?;
let combined = combined
.into_iter()
.flatten()
.collect::<Vec<CombinedIndex>>();
Ok(SortedIterator::new(indices, combined, batch_size))
}

let pre_sort = if buffered_batches.len() == 1 {
buffered_batches.pop()
} else {
let batches = buffered_batches.drain(..).collect::<Vec<_>>();
// combine all record batches into one for each column
common::combine_batches(&batches, schema.clone())?
};
struct SortedIterator {
pos: usize,
indices: UInt32Array,
combined: Vec<CombinedIndex>,
batch_size: usize,
length: usize,
}

pre_sort
.map(|batch| sort_batch(batch, schema.clone(), expressions))
.transpose()?
};
impl SortedIterator {
fn new(
indices: UInt32Array,
combined: Vec<CombinedIndex>,
batch_size: usize,
) -> Self {
let length = combined.len();
Self {
pos: 0,
indices,
combined,
batch_size,
length,
}
}
}

Ok(Box::pin(SizedRecordBatchStream::new(
schema,
vec![Arc::new(result.unwrap())],
tracking_metrics,
)))
impl Iterator for SortedIterator {
type Item = Vec<CombinedIndex>;

fn next(&mut self) -> Option<Self::Item> {
if self.pos >= self.length {
return None;
}

let current_size = min(self.batch_size, self.length - self.pos);
let mut result = Vec::with_capacity(current_size);
for i in 0..current_size {
let p = self.pos + i;
let c_index = self.indices.value(p) as usize;
result.push(self.combined[c_index])
}
self.pos += current_size;
Some(result)
}
}

/// Stream of sorted record batches
struct SortedSizedRecordBatchStream {
schema: SchemaRef,
batches: Vec<RecordBatch>,
sorted_iter: SortedIterator,
num_cols: usize,
metrics: MemTrackingMetrics,
}

impl SortedSizedRecordBatchStream {
/// new
pub fn new(
schema: SchemaRef,
batches: Vec<RecordBatch>,
sorted_iter: SortedIterator,
metrics: MemTrackingMetrics,
) -> Self {
let size = batches.iter().map(batch_byte_size).sum::<usize>();
metrics.init_mem_used(size);
let num_cols = batches[0].num_columns();
SortedSizedRecordBatchStream {
schema,
batches,
sorted_iter,
num_cols,
metrics,
}
}
}

impl Stream for SortedSizedRecordBatchStream {
type Item = ArrowResult<RecordBatch>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
match self.sorted_iter.next() {
None => Poll::Ready(None),
Some(combined) => {
let mut output = Vec::with_capacity(self.num_cols);
for i in 0..self.num_cols {
let arrays = self
.batches
.iter()
.map(|b| b.column(i).data())
.collect::<Vec<_>>();
let mut mutable =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very clever 👍 👍

MutableArrayData::new(arrays, false, combined.len());
for x in combined.iter() {
mutable.extend(x.batch_idx, x.row_idx, x.row_idx + 1);
}
output.push(make_array(mutable.freeze()))
}
let batch = RecordBatch::try_new(self.schema.clone(), output);
let poll = Poll::Ready(Some(batch));
self.metrics.record_poll(poll)
}
}
}
}

impl RecordBatchStream for SortedSizedRecordBatchStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}

fn transpose<T>(v: Vec<Vec<T>>) -> Vec<Vec<T>> {
assert!(!v.is_empty());
let len = v[0].len();
let mut iters: Vec<_> = v.into_iter().map(|n| n.into_iter()).collect();
(0..len)
.map(|_| {
iters
.iter_mut()
.map(|n| n.next().unwrap())
.collect::<Vec<T>>()
})
.collect()
}

async fn spill_partial_sorted_stream(
Expand Down Expand Up @@ -576,6 +775,8 @@ async fn do_sort(
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let schema = input.schema();
let tracking_metrics =
metrics_set.new_intermediate_tracking(partition_id, context.runtime_env());
let sorter = ExternalSorter::new(
partition_id,
schema.clone(),
Expand All @@ -587,7 +788,7 @@ async fn do_sort(
context.runtime_env().register_requester(sorter.id());
while let Some(batch) = input.next().await {
let batch = batch?;
sorter.insert_batch(batch).await?;
sorter.insert_batch(batch, &tracking_metrics).await?;
}
sorter.sort().await
}
Expand Down