-
Notifications
You must be signed in to change notification settings - Fork 986
Use rapidsmpf ShufflerAsync #20701
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Use rapidsmpf ShufflerAsync #20701
Conversation
| ir_context=ir_context, | ||
| ) | ||
| # Reserve shuffle IDs for the entire pipeline execution | ||
| with ReserveOpIDs(ir) as shuffle_id_map: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the changes in this file correspond to the tab needed for this ReserveOpIDs context. We also pass the shuffle_id_map into generate_network (and it is added to GenState for pipeline construction). For multi-GPU execution, we need to reserve the shuffle IDs ahead of time, so we might as well make that change now.
python/cudf_polars/cudf_polars/experimental/rapidsmpf/shuffle.py
Outdated
Show resolved
Hide resolved
python/cudf_polars/cudf_polars/experimental/rapidsmpf/shuffle.py
Outdated
Show resolved
Hide resolved
| # Collect all Shuffle nodes. | ||
| # NOTE: We will also need to collect Repartition, | ||
| # and Join nodes to support multi-GPU execution. | ||
| self.shuffle_nodes: list[IR] = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we do support Repartition, will we store them in the same list (shuffle_nodes) / dictionary (shuffle_id_map) as the shuffle nodes? I'm wonder if we can easily future proof the interface at all, so that we don't need a ton of noisy changes when we go to support other types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it's the same list. We can just call it collective_nodes to make that clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed everything to collective_id/collective_nodes and refactored some of the common logic into a separate file.
| self.shuffler.shutdown() | ||
| _release_shuffle_id(self.op_id) | ||
| """Exit the ShuffleContext manager.""" | ||
| del self.shuffler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is shutting down the shuffler not sufficient anymore? I'm not a huge fan of changing the attributes available on an object (you need to know what context you're in to know whether it's safe to call some method).
And can you remind me what the actual state here that needs to be setup / torn down? Do we just need reliable shutdown of the ShufflerAsync?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably don't need this anymore. The original shuffler context was dynamically releasing the shuffle id at runtime. This was working fine for single-GPU execution, but is no-longer reliable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, one clarification: The shutdown method does not exist for ShufflerAsync (only for the synchronous Shuffler). It's not clear if we really gain much by explicitly cleaning up in an __exit__ definition, but I also don't think it "hurts". Let me know if I should change anything here.
Description
Now that rapidsai/rapidsmpf#685 is in, we can use
ShufflerAsyncin cudf-polars for the rapidsmpf runtime. This PR also makes some improvements to theShuffleContextin preparation for multi-GPU support.Checklist