-
Notifications
You must be signed in to change notification settings - Fork 1.7k
feat: Push down hashes to probe side in HashJoinExec #17529
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
0649ad3 to
18d2acc
Compare
131c4c5 to
8c5d61b
Compare
8c5d61b to
e23fbea
Compare
| //! ``` | ||
| //! The join portion of the query should look something like this: | ||
| //! | ||
| //! ```text |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
@adriangb I think this is probably ready for an initial look when you get a chance! I plan on adding unit + fuzz tests as well. But let me know if you have any other thoughts re: testing. |
|
@alamb Would you be able to kick off benchmarks 🙏🏾 ? Specifically TPC-H against parquet files Should want the following configuration options set: |
| /// Each element represents the column bounds computed by one partition. | ||
| bounds: Vec<PartitionBounds>, | ||
| /// Hashes from the left (build) side, if enabled | ||
| left_hashes: NoHashSet<u64>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found using a HashSet here to yield better performance than using a Vec<Arc<dyn JoinHashMapType>>. Though, it does of course result in extra allocations.
My guess is that its primarily due to that Vec<Arc<dyn JoinHashMapType>> results in more indirection + scattered memory accesses which likely means worse cache locality.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I worry that the extra memory cost is prohibitively expensive: there are going to be queries that ran just fine previously but now OOM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is opt-in, hopefully it's not as much of an issue? I've mentioned this in the config documentation:
/// When set to true, hash joins will allow passing hashes from the build
/// side to the right side of the join. This can be useful to prune rows early on,
/// but may consume more memory.
In general though I agree that we shouldn't need extra allocations here. It gets a bit tricky though because even if we combine all the hash tables from each build partition into a single, shareable table - each stream probe partition needs to be able to validate that the lookup is localized to its partition. Otherwise we'll see duplicate / incorrect results I believe.
Though, it may just take some tweaking to the existing data structure. Haven't thought about it enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the way you'd do it is something like (col in hash_table_1) OR (col in hash_table_2) OR (...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah that's essentially the same as what I was referring to earlier with using a Vec<Arc<dyn JoinHashMapType>> (sorry probably should've clarified more).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@LiaCastaneda do you mean because of hash collisions?
I was thinking because localized lookups would be more efficient. iiuc, probe partition 0 should only check build partition 0's hash table, and so on. The problem is that on the probe side, in the evaluate() function (when the dynamic filter runs), we don't have information about which partition the batch belongs to.
I wonder if we can compute this "routing" using the RepartitionExec hashing to figure out the partition, then use the join's hashing for the actual hash lookup. I tried this two hash approach branching off this PR and have something here, I think it returns correct results while using less memory in most cases (e.g., Q9 and Q18 save ~600MB each). @rkrishn7 feel free to take it if it helps or if it even makes sense :) I was just trying things out here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @LiaCastaneda!
Yeah, so this is pretty much exactly what I had done previously. Instead of scanning through the Vec<Arc<dyn JoinHashMapType>>, we can leverage the fact that we know the hashing method downstream in RepartitionExec. And thus use the same seed/columns used to compute the hash for distributing across partitions.
But, even though we get O(1) lookup in this approach, I found it to be not as performant as the single HashSet approach. My thinking is this occurs primarily because we're using any one of the N HashMaps on the probe side. So, this likely exhibits much worse cache efficiency than a single HashSet. I'm not sure if this aligns with your measurements
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I do think @adriangb's thinking is correct in his comment here. Even if this approach is slightly less performant than allocating an entire new HashSet, it probably wins just on account of no extra memory overhead.
Interestingly I had thought the HashSet approach yielded substantially better results but looking at the results in your branch @LiaCastaneda they seem to be somewhat comparable? I will test out again today locally so we have another comparison
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this aligns with your measurements
I think, in terms of latency, the results were similar. The key improvement I noticed was in memory usage -- for instance, Q18 has all distinct left side values, making it the heaviest in terms of memory. There’s a difference from 5.1 GB down to 4.4 GB
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ran your commit locally @LiaCastaneda and can confirm I see similar results as well! Strange - I thought I was doing essentially the same thing in my comparisons from a while ago 🤔 . Only difference then is I was running on my beefier Linux computer which I don't have at the moment, but I definitely could have just missed something.
Anyhow, since results look similar I propose we move forward with the approach of reusing the hash table(s)! I can apply your patch to this branch over the weekend if that sounds good. And thanks again for putting that up 🙌🏾
| } | ||
|
|
||
| impl SharedBuildAccumulator { | ||
| /// Creates a new [SharedBuildAccumulator] configured for the given partition mode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /// Creates a new [SharedBuildAccumulator] configured for the given partition mode | |
| /// Creates a new [`SharedBuildAccumulator`] configured for the given partition mode |
|
Very cool! Incidentally we were just discussing today with @gabotechs and @robtandy how to make HashJoin dynamic filter pushdown more compatible with distributed datafusion and how to eliminate the latency associated with waiting until we have the full build side to create filters. One idea that came up was to push something like: But for this PR the big question in my mind is going to be: is the cost of the extra evaluation of the hash worth it? |
|
🤖 |
I am not quite sure how to set these options in the benchmarks... |
Ah okay then we probably won't see any changes since these need to be enabled for the changes here to take effect. Posting a comparison I did locally: |
|
🤖: Benchmark completed Details
|
|
@rkrishn7 sorry if I haven't looped back here. I went on a bit of a tangent exploring #17632 and then had some vacation and a team offsite. This is overall very exciting work that I think will help a lot of people. My main concern with this change is the overhead and making a CPU / memory tradeoff decision for users. I think we might be able to ship it as an experimental thing with the feature flag defaulting to false as you've done in this PR but long term I worry that an extra 8GB or RAM consumed might be too much. Do you have any numbers on how fast and how much RAM these 3 different scenarios use for some queries? I don't mean to ask you to run them all but I do remember you mentioning you have already.
My hypothesis is that the table will look something like this:
If that were the case, which is just a guess at this point, making a query 10x faster with no extra memory use is easy to justify, everyone wants that! Choosing to make some queries say 11x faster for 2x memory use is harder to justify. If the performance difference is larger and we think it is justified in some cases maybe we can at least try to reserve the extra memory and fall back to re-using the existing hash tables? I also think it's worth thinking about integrating your suggestion from our conversation to use an |
|
Nevermind, I see discussion in #17171 |
|
👋 I was playing around with this feature today, here are some results for sf1 and sf10, claude did these nice summaries SF10 Memory measurements are from In any case, with some manual logging for Q18 (which appears to be the heaviest join), I'm seeing 1.5M hashes for SF1 and 15M hashes for SF10 (just realized its because all the build side are distinct values). The correlation between more data and more memory is clear, but IMO, if there's a way to measure the total number of hashes across partitions, could we opt out of the feature during runtime? and allow the user to configure this based on their available resources/ pod size |
| create_hashes(&expr_values, self.random_state, &mut hashes_buffer)?; | ||
|
|
||
| // Create a boolean array where each position indicates if the corresponding hash is in the set of known hashes | ||
| let mut buf = MutableBuffer::from_len_zeroed(bit_util::ceil(num_rows, 8)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be able to use MutableBuffer::collect_bool instead of setting bits.
| let estimated_additional_size = | ||
| estimate_memory_size::<u64>(left_hash_map.num_hashes(), fixed_size)?; | ||
| inner.reservation.try_grow(estimated_additional_size)?; | ||
| inner.left_hashes.extend(left_hash_map.hashes()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we maybe derive the capacity for left_hashes upfront to reduce the cost of building the map?
|
Benchmarks look promising, but I think we should be able to reduce/remove regressions by avoiding the overhead of creating a new |
|
Thanks for the reviews and testing @adriangb @LiaCastaneda @Dandandan Apologies, I haven't had time to look at this the past couple weeks. I'll be able to give this more attention next week when I'm back home. Still very excited about this work and looking forward to addressing everyone's feedback/suggestions! |
Which issue does this PR close?
What changes are included in this PR?
hash_join_sideways_hash_passing) to enable passing hashes from build side to probe side scans.HashComparePhysicalExprthat is pushed down to supported right-side scans in hash join.Are these changes tested?
Not yet. Plan to add unit + fuzz tests
Are there any user-facing changes?
Yes, new configuration option for hash join execution.