Skip to content

Commit ac0aea2

Browse files
committed
Account for memory usage in SortPreservingMerge
1 parent 48766c9 commit ac0aea2

File tree

14 files changed

+576
-71
lines changed

14 files changed

+576
-71
lines changed

datafusion/common/src/config.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,23 @@ config_namespace! {
235235
///
236236
/// Defaults to the number of CPU cores on the system
237237
pub planning_concurrency: usize, default = num_cpus::get()
238+
239+
/// How much memory is set aside, for each spillable sort, to
240+
/// ensure an in-memory merge can occur. This setting has no
241+
/// if the sort can not spill (there is no `DiskManager`
242+
/// configured)
243+
///
244+
/// As part of spilling to disk, in memory data must be sorted
245+
/// / merged before writing the file. This in-memory
246+
/// sort/merge requires memory as well, so To avoid allocating
247+
/// once memory is exhausted, DataFusion sets aside this
248+
/// many bytes before.
249+
pub sort_spill_reservation_bytes: usize, default = 10 * 1024 * 1024
250+
251+
/// When sorting, below what size should data be concatenated
252+
/// and sorted in a single RecordBatch rather than sorted in
253+
/// batches and merged.
254+
pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
238255
}
239256
}
240257

datafusion/core/src/physical_plan/repartition/mod.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -576,14 +576,21 @@ impl ExecutionPlan for RepartitionExec {
576576

577577
// Get existing ordering:
578578
let sort_exprs = self.input.output_ordering().unwrap_or(&[]);
579-
// Merge streams (while preserving ordering) coming from input partitions to this partition:
579+
580+
// Merge streams (while preserving ordering) coming from
581+
// input partitions to this partition:
582+
let fetch = None;
583+
let merge_reservation =
584+
MemoryConsumer::new(format!("{}[Merge {partition}]", self.name()))
585+
.register(context.memory_pool());
580586
streaming_merge(
581587
input_streams,
582588
self.schema(),
583589
sort_exprs,
584590
BaselineMetrics::new(&self.metrics, partition),
585591
context.session_config().batch_size(),
586-
None,
592+
fetch,
593+
merge_reservation,
587594
)
588595
} else {
589596
Ok(Box::pin(RepartitionStream {

datafusion/core/src/physical_plan/sorts/builder.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use arrow::compute::interleave;
1919
use arrow::datatypes::SchemaRef;
2020
use arrow::record_batch::RecordBatch;
2121
use datafusion_common::Result;
22+
use datafusion_execution::memory_pool::MemoryReservation;
2223

2324
#[derive(Debug, Copy, Clone, Default)]
2425
struct BatchCursor {
@@ -37,6 +38,9 @@ pub struct BatchBuilder {
3738
/// Maintain a list of [`RecordBatch`] and their corresponding stream
3839
batches: Vec<(usize, RecordBatch)>,
3940

41+
/// Accounts for memory used by buffered batches
42+
reservation: MemoryReservation,
43+
4044
/// The current [`BatchCursor`] for each stream
4145
cursors: Vec<BatchCursor>,
4246

@@ -47,23 +51,31 @@ pub struct BatchBuilder {
4751

4852
impl BatchBuilder {
4953
/// Create a new [`BatchBuilder`] with the provided `stream_count` and `batch_size`
50-
pub fn new(schema: SchemaRef, stream_count: usize, batch_size: usize) -> Self {
54+
pub fn new(
55+
schema: SchemaRef,
56+
stream_count: usize,
57+
batch_size: usize,
58+
reservation: MemoryReservation,
59+
) -> Self {
5160
Self {
5261
schema,
5362
batches: Vec::with_capacity(stream_count * 2),
5463
cursors: vec![BatchCursor::default(); stream_count],
5564
indices: Vec::with_capacity(batch_size),
65+
reservation,
5666
}
5767
}
5868

5969
/// Append a new batch in `stream_idx`
60-
pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) {
70+
pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) -> Result<()> {
71+
self.reservation.try_grow(batch.get_array_memory_size())?;
6172
let batch_idx = self.batches.len();
6273
self.batches.push((stream_idx, batch));
6374
self.cursors[stream_idx] = BatchCursor {
6475
batch_idx,
6576
row_idx: 0,
66-
}
77+
};
78+
Ok(())
6779
}
6880

6981
/// Append the next row from `stream_idx`
@@ -119,14 +131,16 @@ impl BatchBuilder {
119131
// We can therefore drop all but the last batch for each stream
120132
let mut batch_idx = 0;
121133
let mut retained = 0;
122-
self.batches.retain(|(stream_idx, _)| {
134+
self.batches.retain(|(stream_idx, batch)| {
123135
let stream_cursor = &mut self.cursors[*stream_idx];
124136
let retain = stream_cursor.batch_idx == batch_idx;
125137
batch_idx += 1;
126138

127139
if retain {
128140
stream_cursor.batch_idx = retained;
129141
retained += 1;
142+
} else {
143+
self.reservation.shrink(batch.get_array_memory_size());
130144
}
131145
retain
132146
});

datafusion/core/src/physical_plan/sorts/cursor.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use arrow::datatypes::ArrowNativeTypeOp;
2121
use arrow::row::{Row, Rows};
2222
use arrow_array::types::ByteArrayType;
2323
use arrow_array::{Array, ArrowPrimitiveType, GenericByteArray, PrimitiveArray};
24+
use datafusion_execution::memory_pool::MemoryReservation;
2425
use std::cmp::Ordering;
2526

2627
/// A [`Cursor`] for [`Rows`]
@@ -29,6 +30,11 @@ pub struct RowCursor {
2930
num_rows: usize,
3031

3132
rows: Rows,
33+
34+
/// tracks the reservation for the memory in the `Rows` of this
35+
/// cursor. Freed on drop
36+
#[allow(dead_code)]
37+
reservation: MemoryReservation,
3238
}
3339

3440
impl std::fmt::Debug for RowCursor {
@@ -41,12 +47,18 @@ impl std::fmt::Debug for RowCursor {
4147
}
4248

4349
impl RowCursor {
44-
/// Create a new SortKeyCursor
45-
pub fn new(rows: Rows) -> Self {
50+
/// Create a new SortKeyCursor from `rows` and the associated `reservation`
51+
pub fn new(rows: Rows, reservation: MemoryReservation) -> Self {
52+
assert_eq!(
53+
rows.size(),
54+
reservation.size(),
55+
"memory reservation mismatch"
56+
);
4657
Self {
4758
cur_row: 0,
4859
num_rows: rows.num_rows(),
4960
rows,
61+
reservation,
5062
}
5163
}
5264

datafusion/core/src/physical_plan/sorts/merge.rs

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use arrow::datatypes::{DataType, SchemaRef};
3131
use arrow::record_batch::RecordBatch;
3232
use arrow_array::*;
3333
use datafusion_common::Result;
34+
use datafusion_execution::memory_pool::MemoryReservation;
3435
use futures::Stream;
3536
use std::pin::Pin;
3637
use std::task::{ready, Context, Poll};
@@ -42,14 +43,15 @@ macro_rules! primitive_merge_helper {
4243
}
4344

4445
macro_rules! merge_helper {
45-
($t:ty, $sort:ident, $streams:ident, $schema:ident, $tracking_metrics:ident, $batch_size:ident, $fetch:ident) => {{
46+
($t:ty, $sort:ident, $streams:ident, $schema:ident, $tracking_metrics:ident, $batch_size:ident, $fetch:ident, $reservation:ident) => {{
4647
let streams = FieldCursorStream::<$t>::new($sort, $streams);
4748
return Ok(Box::pin(SortPreservingMergeStream::new(
4849
Box::new(streams),
4950
$schema,
5051
$tracking_metrics,
5152
$batch_size,
5253
$fetch,
54+
$reservation,
5355
)));
5456
}};
5557
}
@@ -63,28 +65,36 @@ pub fn streaming_merge(
6365
metrics: BaselineMetrics,
6466
batch_size: usize,
6567
fetch: Option<usize>,
68+
reservation: MemoryReservation,
6669
) -> Result<SendableRecordBatchStream> {
6770
// Special case single column comparisons with optimized cursor implementations
6871
if expressions.len() == 1 {
6972
let sort = expressions[0].clone();
7073
let data_type = sort.expr.data_type(schema.as_ref())?;
7174
downcast_primitive! {
72-
data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch),
73-
DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch)
74-
DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch)
75-
DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch)
76-
DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch)
75+
data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch, reservation),
76+
DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
77+
DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
78+
DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
79+
DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
7780
_ => {}
7881
}
7982
}
8083

81-
let streams = RowCursorStream::try_new(schema.as_ref(), expressions, streams)?;
84+
let streams = RowCursorStream::try_new(
85+
schema.as_ref(),
86+
expressions,
87+
streams,
88+
reservation.new_empty(),
89+
)?;
90+
8291
Ok(Box::pin(SortPreservingMergeStream::new(
8392
Box::new(streams),
8493
schema,
8594
metrics,
8695
batch_size,
8796
fetch,
97+
reservation,
8898
)))
8999
}
90100

@@ -162,11 +172,12 @@ impl<C: Cursor> SortPreservingMergeStream<C> {
162172
metrics: BaselineMetrics,
163173
batch_size: usize,
164174
fetch: Option<usize>,
175+
reservation: MemoryReservation,
165176
) -> Self {
166177
let stream_count = streams.partitions();
167178

168179
Self {
169-
in_progress: BatchBuilder::new(schema, stream_count, batch_size),
180+
in_progress: BatchBuilder::new(schema, stream_count, batch_size, reservation),
170181
streams,
171182
metrics,
172183
aborted: false,
@@ -197,8 +208,7 @@ impl<C: Cursor> SortPreservingMergeStream<C> {
197208
Some(Err(e)) => Poll::Ready(Err(e)),
198209
Some(Ok((cursor, batch))) => {
199210
self.cursors[idx] = Some(cursor);
200-
self.in_progress.push_batch(idx, batch);
201-
Poll::Ready(Ok(()))
211+
Poll::Ready(self.in_progress.push_batch(idx, batch))
202212
}
203213
}
204214
}

0 commit comments

Comments
 (0)