Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions datafusion/physical-plan/benches/spill_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ fn bench_spill_io(c: &mut Criterion) {
// - Wait for the consumer to finish processing
|spill_file| {
rt.block_on(async {
let stream =
spill_manager.read_spill_as_stream(spill_file).unwrap();
let stream = spill_manager
.read_spill_as_stream(spill_file, None)
.unwrap();
let _ = collect(stream).await.unwrap();
})
},
Expand Down Expand Up @@ -519,8 +520,9 @@ fn benchmark_spill_batches_for_all_codec(
)
.unwrap()
.unwrap();
let stream =
spill_manager.read_spill_as_stream(spill_file).unwrap();
let stream = spill_manager
.read_spill_as_stream(spill_file, None)
.unwrap();
let _ = collect(stream).await.unwrap();
})
},
Expand Down Expand Up @@ -553,7 +555,9 @@ fn benchmark_spill_batches_for_all_codec(
let rt = Runtime::new().unwrap();
let start = Instant::now();
rt.block_on(async {
let stream = spill_manager.read_spill_as_stream(spill_file).unwrap();
let stream = spill_manager
.read_spill_as_stream(spill_file, None)
.unwrap();
let _ = collect(stream).await.unwrap();
});
let read_time = start.elapsed();
Expand Down
9 changes: 6 additions & 3 deletions datafusion/physical-plan/src/sorts/multi_level_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ impl MultiLevelMergeBuilder {
let spill_file = self.sorted_spill_files.remove(0);

// Not reserving any memory for this disk as we are not holding it in memory
self.spill_manager.read_spill_as_stream(spill_file.file)
self.spill_manager
.read_spill_as_stream(spill_file.file, None)
}

// Only in memory streams, so merge them all in a single pass
Expand Down Expand Up @@ -274,10 +275,12 @@ impl MultiLevelMergeBuilder {
.spill_manager
.clone()
.with_batch_read_buffer_capacity(buffer_size)
.read_spill_as_stream(spill.file)?;
.read_spill_as_stream(
spill.file,
Some(spill.max_record_batch_memory),
)?;
sorted_streams.push(stream);
}

let merge_sort_stream = self.create_new_merge_sort(
sorted_streams,
// If we have no sorted spill files left, this is the last run
Expand Down
47 changes: 40 additions & 7 deletions datafusion/physical-plan/src/spill/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::RecordBatchStream;
use futures::{FutureExt as _, Stream};
use log::warn;

/// Stream that reads spill files from disk where each batch is read in a spawned blocking task
/// It will read one batch at a time and will not do any buffering, to buffer data use [`crate::common::spawn_buffered`]
Expand All @@ -54,8 +55,16 @@ use futures::{FutureExt as _, Stream};
struct SpillReaderStream {
schema: SchemaRef,
state: SpillReaderStreamState,
/// Maximum memory size observed among spilling sorted record batches.
/// This is used for validation purposes during reading each RecordBatch from spill.
/// For context on why this value is recorded and validated,
/// see `physical_plan/sort/multi_level_merge.rs`.
max_record_batch_memory: Option<usize>,
}

// Small margin allowed to accommodate slight memory accounting variation
const SPILL_BATCH_MEMORY_MARGIN: usize = 4096;

/// When we poll for the next batch, we will get back both the batch and the reader,
/// so we can call `next` again.
type NextRecordBatchResult = Result<(StreamReader<BufReader<File>>, Option<RecordBatch>)>;
Expand All @@ -76,10 +85,15 @@ enum SpillReaderStreamState {
}

impl SpillReaderStream {
fn new(schema: SchemaRef, spill_file: RefCountedTempFile) -> Self {
fn new(
schema: SchemaRef,
spill_file: RefCountedTempFile,
max_record_batch_memory: Option<usize>,
) -> Self {
Self {
schema,
state: SpillReaderStreamState::Uninitialized(spill_file),
max_record_batch_memory,
}
}

Expand Down Expand Up @@ -125,6 +139,23 @@ impl SpillReaderStream {
Ok((reader, batch)) => {
match batch {
Some(batch) => {
if let Some(max_record_batch_memory) =
self.max_record_batch_memory
{
let actual_size =
get_record_batch_memory_size(&batch);
if actual_size
> max_record_batch_memory
+ SPILL_BATCH_MEMORY_MARGIN
{
warn!(
"Record batch memory usage ({actual_size} bytes) exceeds the expected limit ({max_record_batch_memory} bytes) \n\
by more than the allowed tolerance ({SPILL_BATCH_MEMORY_MARGIN} bytes).\n\
This likely indicates a bug in memory accounting during spilling.\n\
Please report this issue in https://github.com/apache/datafusion/issues/17340."
);
}
}
self.state = SpillReaderStreamState::Waiting(reader);

Poll::Ready(Some(Ok(batch)))
Expand Down Expand Up @@ -417,7 +448,7 @@ mod tests {
let spilled_rows = spill_manager.metrics.spilled_rows.value();
assert_eq!(spilled_rows, num_rows);

let stream = spill_manager.read_spill_as_stream(spill_file)?;
let stream = spill_manager.read_spill_as_stream(spill_file, None)?;
assert_eq!(stream.schema(), schema);

let batches = collect(stream).await?;
Expand Down Expand Up @@ -481,7 +512,7 @@ mod tests {
let spilled_rows = spill_manager.metrics.spilled_rows.value();
assert_eq!(spilled_rows, num_rows);

let stream = spill_manager.read_spill_as_stream(spill_file)?;
let stream = spill_manager.read_spill_as_stream(spill_file, None)?;
assert_eq!(stream.schema(), dict_schema);
let batches = collect(stream).await?;
assert_eq!(batches.len(), 2);
Expand Down Expand Up @@ -512,7 +543,7 @@ mod tests {
assert!(spill_file.path().exists());
assert!(max_batch_mem > 0);

let stream = spill_manager.read_spill_as_stream(spill_file)?;
let stream = spill_manager.read_spill_as_stream(spill_file, None)?;
assert_eq!(stream.schema(), schema);

let batches = collect(stream).await?;
Expand Down Expand Up @@ -547,7 +578,7 @@ mod tests {
let spilled_rows = spill_manager.metrics.spilled_rows.value();
assert_eq!(spilled_rows, num_rows);

let stream = spill_manager.read_spill_as_stream(spill_file)?;
let stream = spill_manager.read_spill_as_stream(spill_file, None)?;
assert_eq!(stream.schema(), schema);

let batches = collect(stream).await?;
Expand Down Expand Up @@ -931,8 +962,10 @@ mod tests {
.spill_record_batch_and_finish(&batches, "Test2")?
.unwrap();

let mut stream_1 = spill_manager.read_spill_as_stream(spill_file_1)?;
let mut stream_2 = spill_manager.read_spill_as_stream(spill_file_2)?;
let mut stream_1 =
spill_manager.read_spill_as_stream(spill_file_1, None)?;
let mut stream_2 =
spill_manager.read_spill_as_stream(spill_file_2, None)?;
stream_1.next().await;
stream_2.next().await;

Expand Down
2 changes: 2 additions & 0 deletions datafusion/physical-plan/src/spill/spill_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,12 @@ impl SpillManager {
pub fn read_spill_as_stream(
&self,
spill_file_path: RefCountedTempFile,
max_record_batch_memory: Option<usize>,
) -> Result<SendableRecordBatchStream> {
let stream = Box::pin(cooperative(SpillReaderStream::new(
Arc::clone(&self.schema),
spill_file_path,
max_record_batch_memory,
)));

Ok(spawn_buffered(stream, self.batch_read_buffer_capacity))
Expand Down