-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Description
Is your feature request related to a problem or challenge?
While working on #7179 I noticed a potential improvement
The key observation is that merging K sorted streams of total rows N:
- takes time proportional to
O(N*K) - is a single threaded operation
K is often called the "Fan In" of the merge
The implementation of ExternalSorter::in_mem_sort_stream will effectively merge all the
buffered batches at once, as shown below. This can be a very large fan in -- 100s or 1000s of RecordBatches
┌─────┐ ┌─────┐
│ 2 │ │ 1 │
│ 3 │ │ 2 │
│ 1 │─ ─▶ sort ─ ─▶│ 2 │─ ─ ─ ─ ─ ─ ┐
│ 4 │ │ 3 │
│ 2 │ │ 4 │ │
└─────┘ └─────┘
┌─────┐ ┌─────┐ │
│ 1 │ │ 1 │
│ 4 │─ ▶ sort ─ ─ ▶│ 1 ├ ─ ┐ │
│ 1 │ │ 4 │
└─────┘ └─────┘ │ │
... ... ▼
│
Could be 100s depending on ─ ─▶ merge ─ ─ ─ ─ ─▶ sorted output
the data being sorted stream
▲
... ...
┌─────┐ ┌─────┐ │
│ 3 │ │ 3 │
│ 1 │─ ▶ sort ─ ─ ▶│ 1 │─ ─ ─ ─ ─ ─ ┤
└─────┘ └─────┘
┌─────┐ ┌─────┐ │
│ 4 │ │ 3 │
│ 3 │─ ▶ sort ─ ─ ▶│ 4 │─ ─ ─ ─ ─ ─ ┘
└─────┘ └─────┘
in_mem_batches
Describe the solution you'd like
A classical approach to such sorts is to use a "cascaded merge" which uses a series of merge operations each with a limited the fanout (e.g. to 10)
┌─────┐ ┌─────┐
│ 2 │ │ 1 │
│ 3 │ │ 2 │
│ 1 │─ ─▶ sort ─ ─▶│ 2 │─ ─ ─ ─ ─ ─ ─ ─ ┐
│ 4 │ │ 3 │
│ 2 │ │ 4 │ │
└─────┘ └─────┘
┌─────┐ ┌─────┐ ▼
│ 1 │ │ 1 │
│ 4 │─ ▶ sort ─ ─ ▶│ 1 ├ ─ ─ ─ ─ ─ ▶ merge ─ ─ ─ ─
│ 1 │ │ 4 │ │
└─────┘ └─────┘
... ... ... ▼
merge ─ ─ ─ ─ ─ ─ ▶ sorted output
stream
▲
... ... ... │
┌─────┐ ┌─────┐
│ 3 │ │ 3 │ │
│ 1 │─ ▶ sort ─ ─ ▶│ 1 │─ ─ ─ ─ ─ ─▶ merge ─ ─ ─ ─
└─────┘ └─────┘
┌─────┐ ┌─────┐ ▲
│ 4 │ │ 3 │
│ 3 │─ ▶ sort ─ ─ ▶│ 4 │─ ─ ─ ─ ─ ─ ─ ─ ┘
└─────┘ └─────┘
in_mem_batches do a series of merges that
each has a limited fan-in
(number of inputs)
This is often better because:
- Is
O(N*ln(N)*ln(K)), there is some additional overhead ofln(N)as the same row must now be compared several times - the intermediate merges can be run in parallel on multiple cores (though the final one is still single threaded)
It would be awesome if someone wanted to:
- Verify the theory that there is a large fan in for large sorts
- Implement a cascaded merge and measure if it improves performance
The sort benchmark (TODO) (thanks @jaylmiller!) may be interesting:
cargo run --release --bin parquet -- sort --path ./data --scale-factor 1.0
Describe alternatives you've considered
Another potential variation might be to get more cores involves in the merging by parallelizing the merge, as described in the Morsel-Driven Parallelism paper
Additional context
No response