Skip to content
Closed
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 69 additions & 35 deletions datafusion/core/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ impl ExternalSorterMetrics {
/// 3. when input is exhausted, merge all in memory batches and spills to get a total order.
struct ExternalSorter {
schema: SchemaRef,
in_mem_batches: Vec<RecordBatch>,
in_mem_batches_sorted: bool,
in_mem_batches: Vec<(bool, RecordBatch)>,
spills: Vec<NamedTempFile>,
/// Sort expressions
expr: Arc<[PhysicalSortExpr]>,
Expand Down Expand Up @@ -118,7 +117,6 @@ impl ExternalSorter {
Self {
schema,
in_mem_batches: vec![],
in_mem_batches_sorted: true,
spills: vec![],
expr: expr.into(),
metrics,
Expand All @@ -133,11 +131,28 @@ impl ExternalSorter {
/// Appends an unsorted [`RecordBatch`] to `in_mem_batches`
///
/// Updates memory usage metrics, and possibly triggers spilling to disk
async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> {
async fn insert_batch(&mut self, mut input: RecordBatch) -> Result<()> {
if input.num_rows() == 0 {
return Ok(());
}

let mut batch_sorted = false;
if self.fetch.map_or(false, |f| f < input.num_rows()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking can we make the heuristic f < input.num_rows() / 10 or something magic numbers to only do eager sort for small `K's?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also thinking something similar but along the lines of f < input.num_rows() && f <= 100, so that we effectively have a hard cur-off for eager sorting after 100 rows.

// Eagerly sort the batch to potentially reduce the number of rows
// after applying the fetch parameter; first perform a memory reservation
// for the sorting procedure.
let mut reservation =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need a new consumer here for each batch insertion -- we could just update the main reservation on self.reservation

So something like do the sort and then update the size on limit

            input = sort_batch(&input, &self.expr, self.fetch)?;
            reservation.try_grow(input.get_array_memory_size());

Note the accounting is reworked in #7130

Copy link
Contributor Author

@gruuya gruuya Aug 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the reasoning for the new consumer was that we only want to reserve a bit of memory briefly to account for the overhead of keeping both the sorted and original batch at the same time. Given that in this case we drop the old batch asap (due to input re-assignment) in favor of a smaller/truncated one I'd agree that a new consumer is not needed in that case. (Same could be said about the consumer/reservation in sort_batch_stream, though in that case since it isn't given that there is a LIMIT we could end up holding 2 same-sized batches at one point in time.)

That said, reservation.try_grow(input.get_array_memory_size()) does then get called immediately below to check whether the new (sorted/truncated) batch can be kept in memory without breaking the configured limit, so I don't think there's a need for another try_grow prior to that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that makes sense

I think I think you could use (they very newly added) Reservation::split() to track the short allocation and free it on drop without needing an entirely new consumer:

https://github.com/apache/arrow-datafusion/blob/23547587c2773ddddb9b16cba1eb8ebf2eebd85a/datafusion/execution/src/memory_pool/mod.rs#L225-L231

perhaps like

let batch_reservation = self.reservation.split(input.get_arra_memory_size());
input = sort_batch(&input, &self.expr, self.fetch)?;
// free allocation for previous input
// (it would also be freed on drop so the free isn't necessary but may be clearer)
batch_reservation.free();

Copy link
Contributor Author

@gruuya gruuya Aug 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, so I went ahead to add this, but then it seemed to me that the logic of split is backwards (for this case at least)? In particular, the size of the reservation is not free memory, but instead used memory, right? And with split + free we'd effectively deduct some piece of memory in our accounting that was previously allocated elsewhere (without actually freeing that memory).

Atm I went with skipping reservation updates at all in this case; the reason being that we're already holding the input batch in memory and we're are about to (try to) allocate memory for it right after sorting it anyway. Hence, the sole purpose of that reservation seems to be to account for that extra piece of memory due to non-in-place sorting (i.e. having the old and the new bath present at the same time briefly), and that piece of memory is guaranteed to be less than the batch size. Let me know if you'd like me to do something else.

I also changed that tests output. Now all tests should be passing, so it's probably time for the regular benchmarks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Atm I went with skipping reservation updates at all in this case;

I think this make sense -- especially since the extra memory for batches is likely small given k is very often small.

MemoryConsumer::new(format!("insert_batch{}", self.partition_id))
.register(&self.runtime.memory_pool);

// TODO: This should probably be try_grow (#5885)
reservation.resize(input.get_array_memory_size());
// Maybe we should perform sorting in a parallel task to unblock the caller
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the parallelization is working correctly all the other cores should be busy doing something useful here -- and thus sorting in another task may not be warranted but I am not sure

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah, I think that makes sense; will remove the comment.

input = sort_batch(&input, &self.expr, self.fetch)?;
Copy link
Contributor

@Dandandan Dandandan Aug 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether you could recover part of the perf difference by concat + sorting only once every n (say every 10) batches. The selectivity of the limit and total work to be performed is much bigger for sorting 81920 vs 8192 rows, also the merging to be performed will be over fewer batches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's worth trying to benchmark as well. It could also be something like every n unsorted rows or every size unsorted bytes to accommodate for variability in batch row count.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, definitely.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fwiw, I did try out a variety of approaches along these lines, but still failed to eliminate the perf difference. There's probably a combination of parameters that is just right and could yield better results, but with the native Top-K operator in the works that likely won't be necessary.

reservation.free();
batch_sorted = true;
}

let size = batch_byte_size(&input);
if self.reservation.try_grow(size).is_err() {
let before = self.reservation.size();
Expand All @@ -159,8 +174,7 @@ impl ExternalSorter {
}
}

self.in_mem_batches.push(input);
self.in_mem_batches_sorted = false;
self.in_mem_batches.push((batch_sorted, input));
Ok(())
}

Expand Down Expand Up @@ -224,7 +238,11 @@ impl ExternalSorter {
self.in_mem_sort().await?;

let spillfile = self.runtime.disk_manager.create_tmp_file("Sorting")?;
let batches = std::mem::take(&mut self.in_mem_batches);

let (sorted, batches): (Vec<bool>, Vec<RecordBatch>) =
std::mem::take(&mut self.in_mem_batches).into_iter().unzip();
assert!(sorted.iter().all(|&s| s));

spill_sorted_batches(batches, spillfile.path(), self.schema.clone()).await?;
let used = self.reservation.free();
self.metrics.spill_count.add(1);
Expand All @@ -235,23 +253,25 @@ impl ExternalSorter {

/// Sorts the in_mem_batches in place
async fn in_mem_sort(&mut self) -> Result<()> {
if self.in_mem_batches_sorted {
if self.in_mem_batches.iter().all(|(sorted, _)| *sorted) {
return Ok(());
}

self.in_mem_batches = self
.in_mem_sort_stream(self.metrics.baseline.intermediate())?
.try_collect()
.await?;
.try_collect::<Vec<_>>()
.await?
.into_iter()
.map(|batch| (true, batch))
.collect();

let size: usize = self
.in_mem_batches
.iter()
.map(|x| x.get_array_memory_size())
.map(|(_, x)| x.get_array_memory_size())
.sum();

self.reservation.resize(size);
self.in_mem_batches_sorted = true;
Ok(())
}

Expand All @@ -262,8 +282,8 @@ impl ExternalSorter {
) -> Result<SendableRecordBatchStream> {
assert_ne!(self.in_mem_batches.len(), 0);
if self.in_mem_batches.len() == 1 {
let batch = self.in_mem_batches.remove(0);
let stream = self.sort_batch_stream(batch, metrics)?;
let (sorted, batch) = self.in_mem_batches.remove(0);
let stream = self.sort_batch_stream(batch, sorted, metrics)?;
self.in_mem_batches.clear();
return Ok(stream);
}
Expand All @@ -273,16 +293,23 @@ impl ExternalSorter {
// This is a very rough heuristic and likely could be refined further
if self.reservation.size() < 1048576 {
// Concatenate memory batches together and sort
let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
let (_, batches): (Vec<bool>, Vec<RecordBatch>) =
std::mem::take(&mut self.in_mem_batches).into_iter().unzip();
let batch = concat_batches(&self.schema, &batches)?;
self.in_mem_batches.clear();
return self.sort_batch_stream(batch, metrics);
// Even if all individual batches were themselves sorted the resulting concatenated one
// isn't guaranteed to be sorted, so we must perform sorting on the stream.
return self.sort_batch_stream(batch, false, metrics);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another approach might be be to not use the concat in place heuristic if there are any previously sorted batches -- now the code only checks for the overall size less than 1MB -- it could also check if there was any true in in_mem_batches and if there are use the merge path below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point; will try to benchmark that change too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I did try to test this approach as well, and then saw some improvements that seemed too good to be true. I went and re-ran the benchmarks again and the improvements held, until they didn't at some point 🤷🏻‍♂️ (fwiw I'm running the benchmarks on a cloud VM, not dedicated hardware).

In hindsight, the sorting benchmarks actually do not use a memory limit and so there were no spills and this code path wasn't exercised. I did try running the benchmarks with memory limits on, but then I hit Dictionary replacement detected when writing IPC file format. arrow error during spilling. It seems like this is a general problem as it happens on the main branch too, though I haven't investigated further.

Either way, I'll add this check now even without doing benchmarking on it because it seems it can only help.

}

let streams = std::mem::take(&mut self.in_mem_batches)
.into_iter()
.map(|batch| {
.map(|(sorted, batch)| {
let metrics = self.metrics.baseline.intermediate();
Ok(spawn_buffered(self.sort_batch_stream(batch, metrics)?, 1))
Ok(spawn_buffered(
self.sort_batch_stream(batch, sorted, metrics)?,
1,
))
})
.collect::<Result<_>>()?;

Expand All @@ -299,27 +326,34 @@ impl ExternalSorter {
fn sort_batch_stream(
&self,
batch: RecordBatch,
sorted: bool,
metrics: BaselineMetrics,
) -> Result<SendableRecordBatchStream> {
let schema = batch.schema();

let mut reservation =
MemoryConsumer::new(format!("sort_batch_stream{}", self.partition_id))
.register(&self.runtime.memory_pool);

// TODO: This should probably be try_grow (#5885)
reservation.resize(batch.get_array_memory_size());

let fetch = self.fetch;
let expressions = self.expr.clone();
let stream = futures::stream::once(futures::future::lazy(move |_| {
let sorted = sort_batch(&batch, &expressions, fetch)?;
metrics.record_output(sorted.num_rows());
drop(batch);
reservation.free();
Ok(sorted)
}));
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
if !sorted {
// Reserve some memory for sorting the batch
let mut reservation =
MemoryConsumer::new(format!("sort_batch_stream{}", self.partition_id))
.register(&self.runtime.memory_pool);

// TODO: This should probably be try_grow (#5885)
reservation.resize(batch.get_array_memory_size());

let fetch = self.fetch;
let expressions = self.expr.clone();
let stream = futures::stream::once(futures::future::lazy(move |_| {
let output = sort_batch(&batch, &expressions, fetch)?;
metrics.record_output(output.num_rows());
drop(batch);
reservation.free();
Ok(output)
}));
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
} else {
let stream = futures::stream::once(futures::future::lazy(move |_| Ok(batch)));
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
}
}
}

Expand Down