-
Notifications
You must be signed in to change notification settings - Fork 26
Open
Description
DataFusion Ray can be extended to leverage datasource integrations that are not built into DataFusion itself, but could be brought in as features, such as:
- popular table formats (like Delta Lake, Iceberg or Hudi)
- popular DBMSs + Flight SQL, as per the DataFusion Table Providers project
However, adding a custom table provider to a distributed DataFusion engine requires two things:
- registering the corresponding
TableProviderFactorywith the DataFusionSessionContext - for integrations that define custom
ExecutionPlannodes, registering the correspondingPhysicalExtensionCodecs
The current code does not allow for such extensions, because:
- the datafusion
SessionContext(which is wrapped in aPySessionContext) is created outside the datafusion-ray library and can only be used via its python interface (i.e. invoking named methods onPyAnyflavours of session context and execution plan) - the only extension codec used for plan serialization is the
ShuffleCodec, which only handles the shuffle read/write nodes of datafusion-ray itself
The solution that we came up with for addressing the above limitations in our fork involves the following changes:
- Add (back) the rust dependency on
datafusion-pythonand provide a python function that creates aPySessionContextfrom withindatafusion_rayitself (which can be customized with the enabled table factories and whatnot and can also be downcast so we can use the rust reference directly).
The "external" pythondatafusion.SessionContext()will continue to be supported, it will just have no support for any "extensions" that datafusion-ray was compiled with (since we don't want to attempt unsafe downcasts). - Add an
Extensiontrait (not necessarily limited to table providers) that gets implemented by each such "extension" in order to:
a. customize theSessionContextbefore it gets returned to python (e.g. registering table provider factories, catalog providers etc.)
b. provide a list of physical extension codecs required for serializing its custom physical plan nodes, if any - create a composite
Extensionssingleton that prepares session contexts created by the newdatafusion_ray.extended_session_context()python function and also maintains a compositePhysicalExtensionCodecto serialize both the built-in shuffle nodes as well as any additional codecs provided by the enabled extensions.
I'd be glad to open a PR for contributing this, unless the feature request is out of scope or there are plans to address it differently.
vakarisbk and austin362667
Metadata
Metadata
Assignees
Labels
No labels