Skip to content
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 111 additions & 62 deletions datafusion/core/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use arrow::compute::{concat_batches, lexsort_to_indices, take};
use arrow::datatypes::SchemaRef;
use arrow::ipc::reader::FileReader;
use arrow::record_batch::RecordBatch;
use arrow_schema::ArrowError;
use datafusion_common::{plan_err, DataFusionError, Result};
use datafusion_execution::memory_pool::{
human_readable_size, MemoryConsumer, MemoryReservation,
Expand All @@ -45,14 +46,14 @@ use datafusion_physical_expr::EquivalenceProperties;
use futures::{StreamExt, TryStreamExt};
use log::{debug, error, trace};
use std::any::Any;
use std::ffi::OsString;
use std::fmt;
use std::fmt::{Debug, Formatter};
use std::fs::File;
use std::io::BufReader;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tempfile::NamedTempFile;
use tokio::sync::mpsc::Sender;
use tokio::task;

struct ExternalSorterMetrics {
Expand Down Expand Up @@ -85,6 +86,10 @@ impl ExternalSorterMetrics {
///
/// 1. get a non-empty new batch from input
///
/// 1.2. if a `fetch` parameter has been provided, and the batch size
/// is larger than `fetch`, sort the incoming batch in order to
/// reduce its size and thus use less memory.
///
/// 2. check with the memory manager there is sufficient space to
/// buffer the batch in memory 2.1 if memory sufficient, buffer
/// batch in memory, go to 1.
Expand Down Expand Up @@ -196,10 +201,9 @@ impl ExternalSorterMetrics {
struct ExternalSorter {
/// schema of the output (and the input)
schema: SchemaRef,
/// Potentially unsorted in memory buffer
in_mem_batches: Vec<RecordBatch>,
/// if `Self::in_mem_batches` are sorted
in_mem_batches_sorted: bool,
/// A vector of tuples, with each tuple consisting of a flag
/// denoting whether the batch is sorted, and the batch itself
in_mem_batches: Vec<(bool, RecordBatch)>,
/// If data has previously been spilled, the locations of the
/// spill files (in Arrow IPC format)
spills: Vec<NamedTempFile>,
Expand Down Expand Up @@ -238,7 +242,6 @@ impl ExternalSorter {
Self {
schema,
in_mem_batches: vec![],
in_mem_batches_sorted: true,
spills: vec![],
expr: expr.into(),
metrics,
Expand All @@ -253,11 +256,19 @@ impl ExternalSorter {
/// Appends an unsorted [`RecordBatch`] to `in_mem_batches`
///
/// Updates memory usage metrics, and possibly triggers spilling to disk
async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> {
async fn insert_batch(&mut self, mut input: RecordBatch) -> Result<()> {
if input.num_rows() == 0 {
return Ok(());
}

let mut batch_sorted = false;
if self.fetch.map_or(false, |f| f < input.num_rows()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking can we make the heuristic f < input.num_rows() / 10 or something magic numbers to only do eager sort for small `K's?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also thinking something similar but along the lines of f < input.num_rows() && f <= 100, so that we effectively have a hard cur-off for eager sorting after 100 rows.

// Eagerly sort the batch to potentially reduce the number of rows
// after applying the fetch parameter.
input = sort_batch(&input, &self.expr, self.fetch)?;
Copy link
Contributor

@Dandandan Dandandan Aug 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether you could recover part of the perf difference by concat + sorting only once every n (say every 10) batches. The selectivity of the limit and total work to be performed is much bigger for sorting 81920 vs 8192 rows, also the merging to be performed will be over fewer batches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's worth trying to benchmark as well. It could also be something like every n unsorted rows or every size unsorted bytes to accommodate for variability in batch row count.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, definitely.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fwiw, I did try out a variety of approaches along these lines, but still failed to eliminate the perf difference. There's probably a combination of parameters that is just right and could yield better results, but with the native Top-K operator in the works that likely won't be necessary.

batch_sorted = true;
}

let size = batch_byte_size(&input);
if self.reservation.try_grow(size).is_err() {
let before = self.reservation.size();
Expand All @@ -279,8 +290,7 @@ impl ExternalSorter {
}
}

self.in_mem_batches.push(input);
self.in_mem_batches_sorted = false;
self.in_mem_batches.push((batch_sorted, input));
Ok(())
}

Expand All @@ -307,8 +317,9 @@ impl ExternalSorter {
}

for spill in self.spills.drain(..) {
let stream = read_spill_as_stream(spill, self.schema.clone())?;
streams.push(stream);
let spill_streams =
read_spill_as_streams(spill.path(), self.schema.clone())?;
streams.extend(spill_streams);
}

streaming_merge(
Expand Down Expand Up @@ -345,7 +356,7 @@ impl ExternalSorter {
}

/// Writes any `in_memory_batches` to a spill file and clears
/// the batches. The contents of the spil file are sorted.
/// the batches. The contents of the spill file are sorted.
///
/// Returns the amount of memory freed.
async fn spill(&mut self) -> Result<usize> {
Expand All @@ -359,7 +370,11 @@ impl ExternalSorter {
self.in_mem_sort().await?;

let spillfile = self.runtime.disk_manager.create_tmp_file("Sorting")?;
let batches = std::mem::take(&mut self.in_mem_batches);

let (sorted, batches): (Vec<bool>, Vec<RecordBatch>) =
std::mem::take(&mut self.in_mem_batches).into_iter().unzip();
assert!(sorted.iter().all(|&s| s));

spill_sorted_batches(batches, spillfile.path(), self.schema.clone()).await?;
let used = self.reservation.free();
self.metrics.spill_count.add(1);
Expand All @@ -370,23 +385,27 @@ impl ExternalSorter {

/// Sorts the in_mem_batches in place
async fn in_mem_sort(&mut self) -> Result<()> {
if self.in_mem_batches_sorted {
let batch_count = self.in_mem_batches.len();
let all_batches_sorted = self.in_mem_batches.iter().all(|(sorted, _)| *sorted);
if batch_count == 0 || all_batches_sorted {
return Ok(());
}

self.in_mem_batches = self
.in_mem_sort_stream(self.metrics.baseline.intermediate())?
.try_collect()
.await?;
.try_collect::<Vec<_>>()
.await?
.into_iter()
.map(|batch| (true, batch))
.collect();

let size: usize = self
.in_mem_batches
.iter()
.map(|x| x.get_array_memory_size())
.map(|(_, x)| x.get_array_memory_size())
.sum();

self.reservation.resize(size);
self.in_mem_batches_sorted = true;
Ok(())
}

Expand Down Expand Up @@ -454,8 +473,8 @@ impl ExternalSorter {
) -> Result<SendableRecordBatchStream> {
assert_ne!(self.in_mem_batches.len(), 0);
if self.in_mem_batches.len() == 1 {
let batch = self.in_mem_batches.remove(0);
let stream = self.sort_batch_stream(batch, metrics)?;
let (sorted, batch) = self.in_mem_batches.remove(0);
let stream = self.sort_batch_stream(batch, sorted, metrics)?;
self.in_mem_batches.clear();
return Ok(stream);
}
Expand All @@ -465,16 +484,23 @@ impl ExternalSorter {
// This is a very rough heuristic and likely could be refined further
if self.reservation.size() < 1048576 {
// Concatenate memory batches together and sort
let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
let (_, batches): (Vec<bool>, Vec<RecordBatch>) =
std::mem::take(&mut self.in_mem_batches).into_iter().unzip();
let batch = concat_batches(&self.schema, &batches)?;
self.in_mem_batches.clear();
return self.sort_batch_stream(batch, metrics);
// Even if all individual batches were themselves sorted the resulting concatenated one
// isn't guaranteed to be sorted, so we must perform sorting on the stream.
return self.sort_batch_stream(batch, false, metrics);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another approach might be be to not use the concat in place heuristic if there are any previously sorted batches -- now the code only checks for the overall size less than 1MB -- it could also check if there was any true in in_mem_batches and if there are use the merge path below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point; will try to benchmark that change too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I did try to test this approach as well, and then saw some improvements that seemed too good to be true. I went and re-ran the benchmarks again and the improvements held, until they didn't at some point 🤷🏻‍♂️ (fwiw I'm running the benchmarks on a cloud VM, not dedicated hardware).

In hindsight, the sorting benchmarks actually do not use a memory limit and so there were no spills and this code path wasn't exercised. I did try running the benchmarks with memory limits on, but then I hit Dictionary replacement detected when writing IPC file format. arrow error during spilling. It seems like this is a general problem as it happens on the main branch too, though I haven't investigated further.

Either way, I'll add this check now even without doing benchmarking on it because it seems it can only help.

}

let streams = std::mem::take(&mut self.in_mem_batches)
.into_iter()
.map(|batch| {
.map(|(sorted, batch)| {
let metrics = self.metrics.baseline.intermediate();
Ok(spawn_buffered(self.sort_batch_stream(batch, metrics)?, 1))
Ok(spawn_buffered(
self.sort_batch_stream(batch, sorted, metrics)?,
1,
))
})
.collect::<Result<_>>()?;

Expand All @@ -492,27 +518,34 @@ impl ExternalSorter {
fn sort_batch_stream(
&self,
batch: RecordBatch,
sorted: bool,
metrics: BaselineMetrics,
) -> Result<SendableRecordBatchStream> {
let schema = batch.schema();

let mut reservation =
MemoryConsumer::new(format!("sort_batch_stream{}", self.partition_id))
.register(&self.runtime.memory_pool);

// TODO: This should probably be try_grow (#5885)
reservation.resize(batch.get_array_memory_size());

let fetch = self.fetch;
let expressions = self.expr.clone();
let stream = futures::stream::once(futures::future::lazy(move |_| {
let sorted = sort_batch(&batch, &expressions, fetch)?;
metrics.record_output(sorted.num_rows());
drop(batch);
reservation.free();
Ok(sorted)
}));
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
if !sorted {
// Reserve some memory for sorting the batch
let mut reservation =
MemoryConsumer::new(format!("sort_batch_stream{}", self.partition_id))
.register(&self.runtime.memory_pool);

// TODO: This should probably be try_grow (#5885)
reservation.resize(batch.get_array_memory_size());

let fetch = self.fetch;
let expressions = self.expr.clone();
let stream = futures::stream::once(futures::future::lazy(move |_| {
let output = sort_batch(&batch, &expressions, fetch)?;
metrics.record_output(output.num_rows());
drop(batch);
reservation.free();
Ok(output)
}));
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
} else {
let stream = futures::stream::once(futures::future::lazy(move |_| Ok(batch)));
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
}
}
}

Expand Down Expand Up @@ -562,22 +595,6 @@ async fn spill_sorted_batches(
}
}

fn read_spill_as_stream(
path: NamedTempFile,
schema: SchemaRef,
) -> Result<SendableRecordBatchStream> {
let mut builder = RecordBatchReceiverStream::builder(schema, 2);
let sender = builder.tx();

builder.spawn_blocking(move || {
if let Err(e) = read_spill(sender, path.path()) {
error!("Failure while reading spill file: {:?}. Error: {}", path, e);
}
});

Ok(builder.build())
}

fn write_sorted(
batches: Vec<RecordBatch>,
path: PathBuf,
Expand All @@ -597,15 +614,47 @@ fn write_sorted(
Ok(())
}

fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> {
/// Stream batches from spill files.
///
/// Each spill file has one or more batches. Intra-batch order is guaranteed (each one is sorted),
/// but the inter-batch ordering is not guaranteed, hence why we need to convert each batch from the
/// spill to a separate input stream for the merge-sort procedure.
Copy link
Member

@yjshen yjshen Aug 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this produce unnecessary comparison when the inter-batch ordering in spillfiles are guaranteed for the normal case without K?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think not, because than it should be single record batch per spill. I can add a sort counter to double-check though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A spill file is generated by in_mem_sort() and then spill. And spill_sorted_batches takes batches: Vec<RecordBatch> as one of the arguments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good point; for some reason I thought that only 1 batch gets collected from in_mem_sort_stream in case of no fetch, even though that is not a given.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fwiw, I did revise the approach to stream the entire spill in one stream when inter-batch order is guaranteed (which involved keeping track of this as well). I also added the hard cut-off for eager batching to 100 rows.

sorting benchmarks
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃       main ┃ top-k-eager-sorting ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Qsort utf8   │ 28233.61ms │          29251.16ms │     no change │
│ fetch None   │            │                     │               │
│ Qsort int    │ 36739.73ms │          36870.39ms │     no change │
│ fetch None   │            │                     │               │
│ Qsort        │ 29974.54ms │          30480.98ms │     no change │
│ decimal      │            │                     │               │
│ fetch None   │            │                     │               │
│ Qsort        │ 40214.81ms │          40804.29ms │     no change │
│ integer      │            │                     │               │
│ tuple fetch  │            │                     │               │
│ None         │            │                     │               │
│ Qsort utf8   │ 29429.65ms │          29875.64ms │     no change │
│ tuple fetch  │            │                     │               │
│ None         │            │                     │               │
│ Qsort mixed  │ 34435.12ms │          34415.10ms │     no change │
│ tuple fetch  │            │                     │               │
│ None         │            │                     │               │
│ Qsort utf8   │  1709.85ms │           1829.22ms │  1.07x slower │
│ fetch        │            │                     │               │
│ Some(1)      │            │                     │               │
│ Qsort int    │  1842.45ms │           1749.96ms │ +1.05x faster │
│ fetch        │            │                     │               │
│ Some(1)      │            │                     │               │
│ Qsort        │  1680.46ms │           1677.47ms │     no change │
│ decimal      │            │                     │               │
│ fetch        │            │                     │               │
│ Some(1)      │            │                     │               │
│ Qsort        │  1745.56ms │           1871.46ms │  1.07x slower │
│ integer      │            │                     │               │
│ tuple fetch  │            │                     │               │
│ Some(1)      │            │                     │               │
│ Qsort utf8   │  1965.73ms │           2331.78ms │  1.19x slower │
│ tuple fetch  │            │                     │               │
│ Some(1)      │            │                     │               │
│ Qsort mixed  │  1813.14ms │           2061.60ms │  1.14x slower │
│ tuple fetch  │            │                     │               │
│ Some(1)      │            │                     │               │
│ Qsort utf8   │  1712.66ms │           1829.49ms │  1.07x slower │
│ fetch        │            │                     │               │
│ Some(10)     │            │                     │               │
│ Qsort int    │  1832.73ms │           1759.00ms │     no change │
│ fetch        │            │                     │               │
│ Some(10)     │            │                     │               │
│ Qsort        │  1699.42ms │           1686.28ms │     no change │
│ decimal      │            │                     │               │
│ fetch        │            │                     │               │
│ Some(10)     │            │                     │               │
│ Qsort        │  1762.86ms │           1877.71ms │  1.07x slower │
│ integer      │            │                     │               │
│ tuple fetch  │            │                     │               │
│ Some(10)     │            │                     │               │
│ Qsort utf8   │  1974.07ms │           2337.96ms │  1.18x slower │
│ tuple fetch  │            │                     │               │
│ Some(10)     │            │                     │               │
│ Qsort mixed  │  1854.64ms │           2078.75ms │  1.12x slower │
│ tuple fetch  │            │                     │               │
│ Some(10)     │            │                     │               │
│ Qsort utf8   │  1733.28ms │           1853.74ms │  1.07x slower │
│ fetch        │            │                     │               │
│ Some(100)    │            │                     │               │
│ Qsort int    │  1845.74ms │           1797.14ms │     no change │
│ fetch        │            │                     │               │
│ Some(100)    │            │                     │               │
│ Qsort        │  1685.38ms │           1712.97ms │     no change │
│ decimal      │            │                     │               │
│ fetch        │            │                     │               │
│ Some(100)    │            │                     │               │
│ Qsort        │  1799.54ms │           1907.62ms │  1.06x slower │
│ integer      │            │                     │               │
│ tuple fetch  │            │                     │               │
│ Some(100)    │            │                     │               │
│ Qsort utf8   │  2008.06ms │           2371.06ms │  1.18x slower │
│ tuple fetch  │            │                     │               │
│ Some(100)    │            │                     │               │
│ Qsort mixed  │  1855.62ms │           2110.55ms │  1.14x slower │
│ tuple fetch  │            │                     │               │
│ Some(100)    │            │                     │               │
│ Qsort utf8   │  1728.54ms │           1760.71ms │     no change │
│ fetch        │            │                     │               │
│ Some(101)    │            │                     │               │
│ Qsort int    │  1867.64ms │           1902.02ms │     no change │
│ fetch        │            │                     │               │
│ Some(101)    │            │                     │               │
│ Qsort        │  1691.19ms │           1708.74ms │     no change │
│ decimal      │            │                     │               │
│ fetch        │            │                     │               │
│ Some(101)    │            │                     │               │
│ Qsort        │  1784.79ms │           1830.82ms │     no change │
│ integer      │            │                     │               │
│ tuple fetch  │            │                     │               │
│ Some(101)    │            │                     │               │
│ Qsort utf8   │  1999.16ms │           2034.22ms │     no change │
│ tuple fetch  │            │                     │               │
│ Some(101)    │            │                     │               │
│ Qsort mixed  │  1869.82ms │           1914.20ms │     no change │
│ tuple fetch  │            │                     │               │
│ Some(101)    │            │                     │               │
│ Qsort utf8   │  1774.79ms │           1814.19ms │     no change │
│ fetch        │            │                     │               │
│ Some(1000)   │            │                     │               │
│ Qsort int    │  1959.51ms │           1999.51ms │     no change │
│ fetch        │            │                     │               │
│ Some(1000)   │            │                     │               │
│ Qsort        │  1714.06ms │           1730.36ms │     no change │
│ decimal      │            │                     │               │
│ fetch        │            │                     │               │
│ Some(1000)   │            │                     │               │
│ Qsort        │  1940.85ms │           1969.82ms │     no change │
│ integer      │            │                     │               │
│ tuple fetch  │            │                     │               │
│ Some(1000)   │            │                     │               │
│ Qsort utf8   │  2103.61ms │           2132.59ms │     no change │
│ tuple fetch  │            │                     │               │
│ Some(1000)   │            │                     │               │
│ Qsort mixed  │  2032.56ms │           2087.27ms │     no change │
│ tuple fetch  │            │                     │               │
│ Some(1000)   │            │                     │               │
└──────────────┴────────────┴─────────────────────┴───────────────┘

Overall the times are not that impressive, and the implementation is quite convoluted at this point, so I wouldn't merge this either.

fn read_spill_as_streams(
path: &Path,
schema: SchemaRef,
) -> Result<Vec<SendableRecordBatchStream>> {
let file = BufReader::new(File::open(path)?);
let reader = FileReader::try_new(file, None)?;

let mut streams = vec![];
let file_path = path.as_os_str().to_os_string();
for batch in reader {
sender
.blocking_send(batch.map_err(Into::into))
.map_err(|e| DataFusionError::Execution(format!("{e}")))?;
let stream = build_receiver_stream(batch, schema.clone(), file_path.clone());
streams.push(stream);
}
Ok(())
Ok(streams)
}

fn build_receiver_stream(
maybe_batch: Result<RecordBatch, ArrowError>,
schema: SchemaRef,
file_path: OsString,
) -> SendableRecordBatchStream {
let mut builder = RecordBatchReceiverStream::builder(schema.clone(), 2);
let sender = builder.tx();

builder.spawn_blocking(move || {
if let Err(e) = sender
.blocking_send(maybe_batch.map_err(Into::into))
.map_err(|e| DataFusionError::Execution(format!("{e}")))
{
error!(
"Failure while reading spill file: {:?}. Error: {}",
file_path, e
);
}
});
builder.build()
}

/// Sort execution plan.
Expand Down
Loading