Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
621 changes: 304 additions & 317 deletions Cargo.lock

Large diffs are not rendered by default.

15 changes: 9 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,20 @@ rust-version = "1.62"
build = "build.rs"

[dependencies]
datafusion = { version = "41.0.0", features = ["pyarrow", "avro"] }
datafusion-proto = "41.0.0"
datafusion-python = "41.0.0"
datafusion = { version = "42.0.0", features = ["pyarrow", "avro"] }
datafusion-proto = "42.0.0"

# temporarily point to revision until version 42 is released
datafusion-python = { git = "https://github.com/apache/datafusion-python" }

futures = "0.3"
log = "0.4"
prost = "0.12"
pyo3 = { version = "0.21", features = ["extension-module", "abi3", "abi3-py38"] }
prost = "0.13"
pyo3 = { version = "0.22", features = ["extension-module", "abi3", "abi3-py38"] }
tokio = { version = "1.40", features = ["macros", "rt", "rt-multi-thread", "sync"] }

[build-dependencies]
prost-types = "0.12"
prost-types = "0.13"
rustc_version = "0.4.0"
tonic-build = { version = "0.8", default-features = false, features = ["transport", "prost"] }

Expand Down
68 changes: 24 additions & 44 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,41 @@

# DataFusion on Ray

> This was originally a research project donated from [ray-sql](https://github.com/datafusion-contrib/ray-sql) to evaluate performing distributed SQL queries from Python, using
> [Ray](https://www.ray.io/) and [DataFusion](https://github.com/apache/arrow-datafusion).
> This was originally a research project donated from [ray-sql] to evaluate performing distributed SQL queries from
> Python, using [Ray] and [Apache DataFusion]

DataFusion Ray is a distributed SQL query engine powered by the Rust implementation of [Apache Arrow](https://arrow.apache.org/), [Apache DataFusion](https://datafusion.apache.org/) and [Ray](https://www.ray.io/).
[ray-sql]: https://github.com/datafusion-contrib/ray-sql

## Goals
DataFusion Ray is a distributed Python DataFrame and SQL query engine powered by the Rust implementation
of [Apache Arrow], [Apache DataFusion], and [Ray].

- Demonstrate how easily new systems can be built on top of DataFusion. See the [design documentation](./docs/README.md)
to understand how RaySQL works.
- Drive requirements for DataFusion's [Python bindings](https://github.com/apache/arrow-datafusion-python).
- Create content for an interesting blog post or conference talk.
[Ray]: https://www.ray.io/
[Apache Arrow]: https://arrow.apache.org/
[Apache DataFusion]: https://datafusion.apache.org/

## Non Goals
## Comparison to other DataFusion projects

- Re-build the cluster scheduling systems like what [Ballista](https://datafusion.apache.org/ballista/) did.
- Ballista is extremely complex and utilizing Ray feels like it abstracts some of that complexity away.
- Datafusion Ray is delegating cluster management to Ray.
### Comparison to DataFusion Ballista

- Unlike [DataFusion Ballista], DataFusion Ray does not provide its own distributed scheduler and instead relies on
Ray for this functionality. As a result of this design choice, DataFusion Ray is a much smaller and simpler project.
- DataFusion Ray is Python-first, and DataFusion Ballista is Rust-first

[DataFusion Ballista]: https://github.com/apache/datafusion-ballista

### Comparison to DataFusion Python

- [DataFusion Python] provides a Python DataFrame and SQL API for in-process execution. DataFusion Ray extends
DataFusion Python to provide scalability across multiple nodes.

[DataFusion Python]: https://github.com/apache/datafusion-python

## Example

Run the following example live in your browser using a Google Colab [notebook](https://colab.research.google.com/drive/1tmSX0Lu6UFh58_-DBUVoyYx6BoXHOszP?usp=sharing).

```python
import os
import pandas as pd
import ray

from datafusion_ray import DatafusionRayContext
Expand All @@ -54,7 +64,7 @@ SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
ray.init(resources={"worker": 1})

# Create a context and register a table
ctx = DatafusionRayContext(2, use_ray_shuffle=True)
ctx = DatafusionRayContext(2)
# Register either a CSV or Parquet file
# ctx.register_csv("tips", f"{SCRIPT_DIR}/tips.csv", True)
ctx.register_parquet("tips", f"{SCRIPT_DIR}/tips.parquet")
Expand All @@ -75,36 +85,6 @@ for record_batch in result_set:
- Mature SQL support (CTEs, joins, subqueries, etc) thanks to DataFusion
- Support for CSV and Parquet files

## Limitations

- Requires a shared file system currently. Check details [here](./docs/README.md#distributed-shuffle).

## Performance

This chart shows the performance of DataFusion Ray compared to Apache Spark for
[SQLBench-H](https://sqlbenchmarks.io/sqlbench-h/) at a very small data set (10GB), running on a desktop (Threadripper
with 24 physical cores). Both DataFusion Ray and Spark are configured with 24 executors.

### Overall Time

DataFusion Ray is ~1.9x faster overall for this scale factor and environment with disk-based shuffle.

![SQLBench-H Total](./docs/sqlbench-h-total.png)

### Per Query Time

Spark is much faster on some queries, likely due to broadcast exchanges, which DataFusion Ray hasn't implemented yet.

![SQLBench-H Per Query](./docs/sqlbench-h-per-query.png)

### Performance Plan

Plans on experimenting with the following changes to improve performance:

- Make better use of Ray futures to run more tasks in parallel
- Use Ray object store for shuffle data transfer to reduce disk I/O cost
- Keep upgrading to newer versions of DataFusion to pick up the latest optimizations

## Building

```bash
Expand Down
8 changes: 4 additions & 4 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ pub fn deserialize_execution_plan(bytes: Vec<u8>) -> PyResult<PyExecutionPlan> {
/// Iterate down an ExecutionPlan and set the input objects for RayShuffleReaderExec.
fn _set_inputs_for_ray_shuffle_reader(
plan: Arc<dyn ExecutionPlan>,
input_partitions: &PyList,
input_partitions: &Bound<'_, PyList>,
) -> Result<()> {
if let Some(reader_exec) = plan.as_any().downcast_ref::<RayShuffleReaderExec>() {
let exec_stage_id = reader_exec.stage_id;
Expand All @@ -200,8 +200,8 @@ fn _set_inputs_for_ray_shuffle_reader(
.map_err(|e| DataFusionError::Execution(format!("{}", e)))?
.extract::<usize>()
.map_err(|e| DataFusionError::Execution(format!("{}", e)))?;
let batch = RecordBatch::from_pyarrow(
pytuple
let batch = RecordBatch::from_pyarrow_bound(
&pytuple
.get_item(2)
.map_err(|e| DataFusionError::Execution(format!("{}", e)))?,
)
Expand Down Expand Up @@ -235,7 +235,7 @@ fn _execute_partition(
));
Python::with_gil(|py| {
let input_partitions = inputs
.as_ref(py)
.bind(py)
.downcast::<PyList>()
.map_err(|e| DataFusionError::Execution(format!("{}", e)))?;
_set_inputs_for_ray_shuffle_reader(plan.plan.clone(), input_partitions)
Expand Down
Loading
Loading