-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Top-K eager batch sorting #7180
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
39d6d16
dcba28f
c4a40f2
ca0786f
5e110d5
7b1a4d2
24d8359
4ce8184
a22cc71
fff49ab
daa9337
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -196,10 +196,9 @@ impl ExternalSorterMetrics { | |
| struct ExternalSorter { | ||
| /// schema of the output (and the input) | ||
| schema: SchemaRef, | ||
| /// Potentially unsorted in memory buffer | ||
| in_mem_batches: Vec<RecordBatch>, | ||
| /// if `Self::in_mem_batches` are sorted | ||
| in_mem_batches_sorted: bool, | ||
| /// A vector of tuples, with each tuple consisting of a flag | ||
| /// denoting whether the batch is sorted, and the batch itself | ||
| in_mem_batches: Vec<(bool, RecordBatch)>, | ||
| /// If data has previously been spilled, the locations of the | ||
| /// spill files (in Arrow IPC format) | ||
| spills: Vec<NamedTempFile>, | ||
|
|
@@ -238,7 +237,6 @@ impl ExternalSorter { | |
| Self { | ||
| schema, | ||
| in_mem_batches: vec![], | ||
| in_mem_batches_sorted: true, | ||
| spills: vec![], | ||
| expr: expr.into(), | ||
| metrics, | ||
|
|
@@ -253,11 +251,27 @@ impl ExternalSorter { | |
| /// Appends an unsorted [`RecordBatch`] to `in_mem_batches` | ||
| /// | ||
| /// Updates memory usage metrics, and possibly triggers spilling to disk | ||
| async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> { | ||
| async fn insert_batch(&mut self, mut input: RecordBatch) -> Result<()> { | ||
| if input.num_rows() == 0 { | ||
| return Ok(()); | ||
| } | ||
|
|
||
| let mut batch_sorted = false; | ||
| if self.fetch.map_or(false, |f| f < input.num_rows()) { | ||
| // Eagerly sort the batch to potentially reduce the number of rows | ||
| // after applying the fetch parameter; first perform a memory reservation | ||
| // for the sorting procedure. | ||
| let mut reservation = | ||
|
||
| MemoryConsumer::new(format!("insert_batch{}", self.partition_id)) | ||
| .register(&self.runtime.memory_pool); | ||
|
|
||
| // TODO: This should probably be try_grow (#5885) | ||
| reservation.resize(input.get_array_memory_size()); | ||
| input = sort_batch(&input, &self.expr, self.fetch)?; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder whether you could recover part of the perf difference by concat + sorting only once every
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, that's worth trying to benchmark as well. It could also be something like every
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, definitely.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fwiw, I did try out a variety of approaches along these lines, but still failed to eliminate the perf difference. There's probably a combination of parameters that is just right and could yield better results, but with the native Top-K operator in the works that likely won't be necessary. |
||
| reservation.free(); | ||
| batch_sorted = true; | ||
| } | ||
|
|
||
| let size = batch_byte_size(&input); | ||
| if self.reservation.try_grow(size).is_err() { | ||
| let before = self.reservation.size(); | ||
|
|
@@ -279,8 +293,7 @@ impl ExternalSorter { | |
| } | ||
| } | ||
|
|
||
| self.in_mem_batches.push(input); | ||
| self.in_mem_batches_sorted = false; | ||
| self.in_mem_batches.push((batch_sorted, input)); | ||
| Ok(()) | ||
| } | ||
|
|
||
|
|
@@ -359,7 +372,11 @@ impl ExternalSorter { | |
| self.in_mem_sort().await?; | ||
|
|
||
| let spillfile = self.runtime.disk_manager.create_tmp_file("Sorting")?; | ||
| let batches = std::mem::take(&mut self.in_mem_batches); | ||
|
|
||
| let (sorted, batches): (Vec<bool>, Vec<RecordBatch>) = | ||
| std::mem::take(&mut self.in_mem_batches).into_iter().unzip(); | ||
| assert!(sorted.iter().all(|&s| s)); | ||
|
|
||
| spill_sorted_batches(batches, spillfile.path(), self.schema.clone()).await?; | ||
| let used = self.reservation.free(); | ||
| self.metrics.spill_count.add(1); | ||
|
|
@@ -370,23 +387,36 @@ impl ExternalSorter { | |
|
|
||
| /// Sorts the in_mem_batches in place | ||
| async fn in_mem_sort(&mut self) -> Result<()> { | ||
| if self.in_mem_batches_sorted { | ||
| if self.in_mem_batches.is_empty() | ||
| || self.in_mem_batches.iter().all(|(sorted, _)| *sorted) | ||
| && self.fetch.is_none() | ||
| { | ||
| // Do not sort if all the in-mem batches are sorted _and_ there was no `fetch` specified. | ||
| // If a `fetch` was specified we could hit a pathological case even if all the batches | ||
| // are sorted whereby we have ~100 in-mem batches with 1 row each (in case of `LIMIT 1`), | ||
| // and then if this gets spilled to disk it turns out this is a problem when reading | ||
| // a series of 1-row batches from the spill: | ||
| // `Failure while reading spill file: NamedTempFile("/var..."). Error: Execution error: channel closed` | ||
| // Even if a larger `fetch` was used we would likely benefit from merging the individual | ||
| // truncated batches together during sort. | ||
| return Ok(()); | ||
| } | ||
|
|
||
| self.in_mem_batches = self | ||
| .in_mem_sort_stream(self.metrics.baseline.intermediate())? | ||
| .try_collect() | ||
| .await?; | ||
| .try_collect::<Vec<_>>() | ||
| .await? | ||
| .into_iter() | ||
| .map(|batch| (true, batch)) | ||
| .collect(); | ||
|
|
||
| let size: usize = self | ||
| .in_mem_batches | ||
| .iter() | ||
| .map(|x| x.get_array_memory_size()) | ||
| .map(|(_, x)| x.get_array_memory_size()) | ||
| .sum(); | ||
|
|
||
| self.reservation.resize(size); | ||
| self.in_mem_batches_sorted = true; | ||
| Ok(()) | ||
| } | ||
|
|
||
|
|
@@ -454,8 +484,8 @@ impl ExternalSorter { | |
| ) -> Result<SendableRecordBatchStream> { | ||
| assert_ne!(self.in_mem_batches.len(), 0); | ||
| if self.in_mem_batches.len() == 1 { | ||
| let batch = self.in_mem_batches.remove(0); | ||
| let stream = self.sort_batch_stream(batch, metrics)?; | ||
| let (sorted, batch) = self.in_mem_batches.remove(0); | ||
| let stream = self.sort_batch_stream(batch, sorted, metrics)?; | ||
| self.in_mem_batches.clear(); | ||
| return Ok(stream); | ||
| } | ||
|
|
@@ -465,16 +495,23 @@ impl ExternalSorter { | |
| // This is a very rough heuristic and likely could be refined further | ||
| if self.reservation.size() < 1048576 { | ||
| // Concatenate memory batches together and sort | ||
| let batch = concat_batches(&self.schema, &self.in_mem_batches)?; | ||
| let (_, batches): (Vec<bool>, Vec<RecordBatch>) = | ||
| std::mem::take(&mut self.in_mem_batches).into_iter().unzip(); | ||
| let batch = concat_batches(&self.schema, &batches)?; | ||
| self.in_mem_batches.clear(); | ||
| return self.sort_batch_stream(batch, metrics); | ||
| // Even if all individual batches were themselves sorted the resulting concatenated one | ||
| // isn't guaranteed to be sorted, so we must perform sorting on the stream. | ||
| return self.sort_batch_stream(batch, false, metrics); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another approach might be be to not use the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point; will try to benchmark that change too.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So I did try to test this approach as well, and then saw some improvements that seemed too good to be true. I went and re-ran the benchmarks again and the improvements held, until they didn't at some point 🤷🏻♂️ (fwiw I'm running the benchmarks on a cloud VM, not dedicated hardware). In hindsight, the sorting benchmarks actually do not use a memory limit and so there were no spills and this code path wasn't exercised. I did try running the benchmarks with memory limits on, but then I hit Either way, I'll add this check now even without doing benchmarking on it because it seems it can only help. |
||
| } | ||
|
|
||
| let streams = std::mem::take(&mut self.in_mem_batches) | ||
| .into_iter() | ||
| .map(|batch| { | ||
| .map(|(sorted, batch)| { | ||
| let metrics = self.metrics.baseline.intermediate(); | ||
| Ok(spawn_buffered(self.sort_batch_stream(batch, metrics)?, 1)) | ||
| Ok(spawn_buffered( | ||
| self.sort_batch_stream(batch, sorted, metrics)?, | ||
| 1, | ||
| )) | ||
| }) | ||
| .collect::<Result<_>>()?; | ||
|
|
||
|
|
@@ -492,27 +529,34 @@ impl ExternalSorter { | |
| fn sort_batch_stream( | ||
| &self, | ||
| batch: RecordBatch, | ||
| sorted: bool, | ||
| metrics: BaselineMetrics, | ||
| ) -> Result<SendableRecordBatchStream> { | ||
| let schema = batch.schema(); | ||
|
|
||
| let mut reservation = | ||
| MemoryConsumer::new(format!("sort_batch_stream{}", self.partition_id)) | ||
| .register(&self.runtime.memory_pool); | ||
|
|
||
| // TODO: This should probably be try_grow (#5885) | ||
| reservation.resize(batch.get_array_memory_size()); | ||
|
|
||
| let fetch = self.fetch; | ||
| let expressions = self.expr.clone(); | ||
| let stream = futures::stream::once(futures::future::lazy(move |_| { | ||
| let sorted = sort_batch(&batch, &expressions, fetch)?; | ||
| metrics.record_output(sorted.num_rows()); | ||
| drop(batch); | ||
| reservation.free(); | ||
| Ok(sorted) | ||
| })); | ||
| Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) | ||
| if !sorted { | ||
| // Reserve some memory for sorting the batch | ||
| let mut reservation = | ||
| MemoryConsumer::new(format!("sort_batch_stream{}", self.partition_id)) | ||
| .register(&self.runtime.memory_pool); | ||
|
|
||
| // TODO: This should probably be try_grow (#5885) | ||
| reservation.resize(batch.get_array_memory_size()); | ||
|
|
||
| let fetch = self.fetch; | ||
| let expressions = self.expr.clone(); | ||
| let stream = futures::stream::once(futures::future::lazy(move |_| { | ||
| let output = sort_batch(&batch, &expressions, fetch)?; | ||
| metrics.record_output(output.num_rows()); | ||
| drop(batch); | ||
| reservation.free(); | ||
| Ok(output) | ||
| })); | ||
| Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) | ||
| } else { | ||
| let stream = futures::stream::once(futures::future::lazy(move |_| Ok(batch))); | ||
| Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,76 +30,85 @@ use datafusion::physical_plan::sorts::sort::SortExec; | |
| use datafusion::physical_plan::{collect, ExecutionPlan}; | ||
| use datafusion::prelude::{SessionConfig, SessionContext}; | ||
| use rand::Rng; | ||
| use rstest::rstest; | ||
| use std::sync::Arc; | ||
| use test_utils::{batches_to_vec, partitions_to_sorted_vec}; | ||
|
|
||
| #[tokio::test] | ||
| #[cfg_attr(tarpaulin, ignore)] | ||
| async fn test_sort_1k_mem() { | ||
| run_sort(10240, vec![(5, false), (20000, true), (1000000, true)]).await | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| #[cfg_attr(tarpaulin, ignore)] | ||
| async fn test_sort_100k_mem() { | ||
| run_sort(102400, vec![(5, false), (20000, false), (1000000, true)]).await | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_sort_unlimited_mem() { | ||
| run_sort( | ||
| usize::MAX, | ||
| vec![(5, false), (2000, false), (1000000, false)], | ||
| ) | ||
| .await | ||
| } | ||
|
|
||
| /// Sort the input using SortExec and ensure the results are correct according to `Vec::sort` | ||
| async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) { | ||
| for (size, spill) in size_spill { | ||
| let input = vec![make_staggered_batches(size)]; | ||
| let first_batch = input | ||
| .iter() | ||
| .flat_map(|p| p.iter()) | ||
| .next() | ||
| .expect("at least one batch"); | ||
| let schema = first_batch.schema(); | ||
|
|
||
| let sort = vec![PhysicalSortExpr { | ||
| expr: col("x", &schema).unwrap(), | ||
| options: SortOptions { | ||
| descending: false, | ||
| nulls_first: true, | ||
| }, | ||
| }]; | ||
|
|
||
| let exec = MemoryExec::try_new(&input, schema, None).unwrap(); | ||
| let sort = Arc::new(SortExec::new(sort, Arc::new(exec))); | ||
|
|
||
| let runtime_config = RuntimeConfig::new() | ||
| .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size))); | ||
| let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap()); | ||
| let session_ctx = SessionContext::with_config_rt(SessionConfig::new(), runtime); | ||
|
|
||
| let task_ctx = session_ctx.task_ctx(); | ||
| let collected = collect(sort.clone(), task_ctx).await.unwrap(); | ||
|
|
||
| let expected = partitions_to_sorted_vec(&input); | ||
| let actual = batches_to_vec(&collected); | ||
|
|
||
| if spill { | ||
| assert_ne!(sort.metrics().unwrap().spill_count().unwrap(), 0); | ||
| } else { | ||
| assert_eq!(sort.metrics().unwrap().spill_count().unwrap(), 0); | ||
| } | ||
| #[rstest] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ❤️ |
||
| #[case::mem_10k_5_rows(10240, 5, None, false)] | ||
| #[case::mem_10k_20k_rows(10240, 20000, None, true)] | ||
| #[case::mem_10k_1m_rows(10240, 1000000, None, true)] | ||
| #[case::mem_10k_5_rows_fetch_1(10240, 5, Some(1), false)] | ||
| #[case::mem_10k_20k_rows_fetch_1(10240, 20000, Some(1), false)] | ||
| #[case::mem_10k_1m_rows_fetch_1(10240, 1000000, Some(1), false)] | ||
| #[case::mem_10k_5_rows_fetch_1000(10240, 5, Some(1000), false)] | ||
| #[case::mem_10k_20k_rows_fetch_1000(10240, 20000, Some(1000), true)] | ||
| #[case::mem_10k_1m_rows_fetch_1000(10240, 1000000, Some(1000), true)] | ||
| #[case::mem_100k_5_rows(102400, 5, None, false)] | ||
| #[case::mem_100k_20k_rows(102400, 20000, None, false)] | ||
| #[case::mem_100k_1m_rows(102400, 1000000, None, true)] | ||
| #[case::mem_100k_5_rows_fetch_1(102400, 5, Some(1), false)] | ||
| #[case::mem_100k_20k_rows_fetch_1(102400, 20000, Some(1), false)] | ||
| #[case::mem_100k_1m_rows_fetch_1(102400, 1000000, Some(1), false)] | ||
| #[case::mem_100k_5_rows_fetch_1000(102400, 5, Some(1000), false)] | ||
| #[case::mem_100k_20k_rows_fetch_1000(102400, 20000, Some(1000), false)] | ||
| #[case::mem_100k_1m_rows_fetch_1000(102400, 1000000, Some(1000), false)] | ||
| #[case::mem_inf_5_rows(usize::MAX, 5, None, false)] | ||
| #[case::mem_inf_20k_rows(usize::MAX, 20000, None, false)] | ||
| #[case::mem_inf_1m_rows(usize::MAX, 1000000, None, false)] | ||
| #[tokio::test] | ||
| async fn test_sort_spill( | ||
| #[case] pool_size: usize, | ||
| #[case] size: usize, | ||
| #[case] fetch: Option<usize>, | ||
| #[case] spill: bool, | ||
| ) { | ||
| let input = vec![make_staggered_batches(size)]; | ||
| let first_batch = input | ||
| .iter() | ||
| .flat_map(|p| p.iter()) | ||
| .next() | ||
| .expect("at least one batch"); | ||
| let schema = first_batch.schema(); | ||
|
|
||
| let sort = vec![PhysicalSortExpr { | ||
| expr: col("x", &schema).unwrap(), | ||
| options: SortOptions { | ||
| descending: false, | ||
| nulls_first: true, | ||
| }, | ||
| }]; | ||
|
|
||
| let exec = MemoryExec::try_new(&input, schema, None).unwrap(); | ||
| let sort = Arc::new(SortExec::new(sort, Arc::new(exec)).with_fetch(fetch)); | ||
|
|
||
| let runtime_config = | ||
| RuntimeConfig::new().with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size))); | ||
| let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap()); | ||
| let session_ctx = SessionContext::with_config_rt(SessionConfig::new(), runtime); | ||
|
|
||
| let task_ctx = session_ctx.task_ctx(); | ||
| let collected = collect(sort.clone(), task_ctx).await.unwrap(); | ||
|
|
||
| let mut expected = partitions_to_sorted_vec(&input); | ||
| if let Some(k) = fetch { | ||
| expected = expected.into_iter().take(k).collect(); | ||
| } | ||
| let actual = batches_to_vec(&collected); | ||
|
|
||
| assert_eq!( | ||
| session_ctx.runtime_env().memory_pool.reserved(), | ||
| 0, | ||
| "The sort should have returned all memory used back to the memory pool" | ||
| ); | ||
| assert_eq!(expected, actual, "failure in @ pool_size {pool_size}"); | ||
| if spill { | ||
| assert_ne!(sort.metrics().unwrap().spill_count().unwrap(), 0); | ||
| } else { | ||
| assert_eq!(sort.metrics().unwrap().spill_count().unwrap(), 0); | ||
| } | ||
|
|
||
| assert_eq!( | ||
| session_ctx.runtime_env().memory_pool.reserved(), | ||
| 0, | ||
| "The sort should have returned all memory used back to the memory pool" | ||
| ); | ||
| assert_eq!(expected, actual, "failure in @ pool_size {pool_size}"); | ||
| } | ||
|
|
||
| /// Return randomly sized record batches in a field named 'x' of type `Int32` | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking can we make the heuristic
f < input.num_rows() / 10or something magic numbers to only do eager sort for small `K's?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was also thinking something similar but along the lines of
f < input.num_rows() && f <= 100, so that we effectively have a hard cur-off for eager sorting after 100 rows.