Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 3 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ bytesize = "1.3"
datafusion = { version = "43.0", features = ["pyarrow", "avro"] }
datafusion-python = { version = "43.1" }
datafusion-proto = "43.0"
env_logger = "0.11"
futures = "0.3"
glob = "0.3.1"
local-ip-address = "0.6"
Expand All @@ -56,6 +57,7 @@ pyo3 = { version = "0.22.6", features = [
"abi3-py38",
] }
pyo3-async-runtimes = { version = "0.22", features = ["tokio-runtime"] }
pyo3-pylogger = "0.3.0"
rust_decimal = "1.36"
tokio = { version = "1.40", features = [
"macros",
Expand Down Expand Up @@ -83,11 +85,6 @@ tonic-build = { version = "0.8", default-features = false, features = [
] }
url = "2"

[dev-dependencies]
#anyhow = "1.0.89"
#pretty_assertions = "1.4.0"
#regex = "1.11.0"

[lib]
name = "datafusion_ray"
crate-type = ["cdylib", "rlib"]
Expand All @@ -105,4 +102,4 @@ debug = 0
opt-level = 1

[profile.dev.package."*"]
opt-level = 3
opt-level = 1
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,18 @@ RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpcbench.py --data=file:///path/to/yo

This will output a json file in the current directory with query timings.

## Logging

DataFusion Ray's logging output is determined by the `DATAFUSION_RAY_LOG_LEVEL` environment variable. The default log level is `WARN`. To change the log level, set the environment variable to one of the following values: `ERROR`, `WARN`, `INFO`, `DEBUG`, or `TRACE`.

DataFusion Ray outputs logs from both python and rust, and in order to handle this consistently, the python logger for `datafusion_ray` is routed to rust for logging. The `RUST_LOG` environment variable can be used to control other rust log output other than `datafusion_ray`.

## Status

- DataFusion Ray can execute all TPCH queries. Tested up to SF100.

## Known Issues

- The DataFusion config setting, `datafusion.execution.parquet.pushdown_filters`, can produce incorrect results. We think this could be related to an issue with round trip physical path serialization. At the moment, do not enable this setting, as it prevents physical plans from serializing correctly.

This should be resolved when we update to a DataFusion version which include <https://github.com/apache/datafusion/pull/14465#event-16194180382>
2 changes: 1 addition & 1 deletion datafusion_ray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@
except ImportError:
import importlib_metadata

from .core import RayContext, prettify
from .core import RayContext, prettify, runtime_env

__version__ = importlib_metadata.version(__name__)
57 changes: 39 additions & 18 deletions datafusion_ray/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@


from collections import defaultdict
from logging import error, debug, info
import os
import pyarrow as pa
import asyncio
import ray
Expand All @@ -30,6 +32,26 @@
)


def setup_logging():
import logging

logging.addLevelName(5, "TRACE")

log_level = os.environ.get("DATAFUSION_RAY_LOG_LEVEL", "WARN").upper()

# this logger gets captured and routed to rust. See src/lib.rs
logging.getLogger("datafusion_ray").setLevel(log_level)


setup_logging()

_log_level = os.environ.get("DATAFUSION_RAY_LOG_LEVEL", "ERROR")
runtime_env = {
"worker_process_setup_hook": setup_logging,
"env_vars": {"DATAFUSION_RAY_LOG_LEVEL": _log_level, "RAY_worker_niceness": "0"},
}


class RayDataFrame:
def __init__(
self,
Expand Down Expand Up @@ -76,21 +98,19 @@ def collect(self) -> list[pa.RecordBatch]:
t1 = time.time()
self.stages()
t2 = time.time()
print(f"creating stages took {t2 -t1}s")
debug(f"creating stages took {t2 -t1}s")

last_stage = max([stage.stage_id for stage in self._stages])
print("last stage is", last_stage)
debug("last stage is", last_stage)

self.create_ray_stages()
t3 = time.time()
print(f"creating ray stage actors took {t3 -t2}s")
debug(f"creating ray stage actors took {t3 -t2}s")
self.run_stages()

addrs = ray.get(self.coord.get_stage_addrs.remote())
print("addrs", addrs)

reader = self.df.read_final_stage(last_stage, addrs[last_stage][0])
print("called df execute, got reader")
self._batches = list(reader)
self.coord.all_done.remote()
return self._batches
Expand All @@ -110,7 +130,7 @@ def create_ray_stages(self):
for stage in self.stages():
num_shadows = stage.num_shadow_partitions()
if self.isolate_partitions and num_shadows:
print(f"stage {stage.stage_id} has {num_shadows} shadows")
debug(f"stage {stage.stage_id} has {num_shadows} shadows")
for shadow in range(num_shadows):
refs.append(
self.coord.new_stage.remote(
Expand Down Expand Up @@ -181,10 +201,10 @@ def __init__(
self.stages_ready = False

async def all_done(self):
print("calling stage all done")
debug("calling stage all done")
refs = [stage.all_done.remote() for stage in self.stages.values()]
ray.wait(refs, num_returns=len(refs))
print("done stage all done")
debug("done stage all done")

async def new_stage(
self,
Expand All @@ -195,7 +215,7 @@ async def new_stage(
stage_key = (stage_id, shadow_partition)
try:

print(f"creating new stage {stage_key} from bytes {len(plan_bytes)}")
debug(f"creating new stage {stage_key} from bytes {len(plan_bytes)}")
stage = RayStage.options(
name=f"Stage: {stage_key}, query_id:{self.query_id}",
).remote(
Expand All @@ -207,15 +227,16 @@ async def new_stage(
self.stages_started.append(stage.start_up.remote())

except Exception as e:
print(
error(
f"RayQueryCoordinator[{self.query_id}] Unhandled Exception in new stage! {e}"
)
raise e

async def wait_for_stages_ready(self):
# TODO: signal our doneness instead of loop
while not self.stages_ready:
await asyncio.sleep(0.1)
print("waiting for stages to be ready")
debug("waiting for stages to be ready")

async def ensure_stages_ready(self):
if not self.stages_ready:
Expand All @@ -231,10 +252,10 @@ async def get_stage_addrs(self) -> dict[int, list[str]]:
async def sort_out_addresses(self):
for stage_key, stage in self.stages.items():
stage_id, shadow_partition = stage_key
print(f" getting stage addr for {stage_id},{shadow_partition}")
debug(f" getting stage addr for {stage_id},{shadow_partition}")
self.stage_addrs[stage_id].append(await stage.addr.remote())

print(f"stage_addrs: {self.stage_addrs}")
debug(f"stage_addrs: {self.stage_addrs}")
# now update all the stages with the addresses of peers such
# that they can contact their child stages
refs = []
Expand All @@ -245,14 +266,14 @@ async def sort_out_addresses(self):

async def serve(self):
await self.ensure_stages_ready()
print("running stages")
info("running stages")
try:
for stage_key, stage in self.stages.items():
print(f"starting serving of stage {stage_key}")
info(f"starting serving of stage {stage_key}")
stage.serve.remote()

except Exception as e:
print(
error(
f"RayQueryCoordinator[{self.query_id}] Unhandled Exception in run stages! {e}"
)
raise e
Expand Down Expand Up @@ -284,7 +305,7 @@ def __init__(
shadow_partition,
)
except Exception as e:
print(
error(
f"StageService[{self.stage_id}{shadow}] Unhandled Exception in init: {e}!"
)
raise
Expand All @@ -303,4 +324,4 @@ async def set_stage_addrs(self, stage_addrs: dict[int, list[str]]):

async def serve(self):
await self.stage_service.serve()
print("StageService done serving")
info("StageService done serving")
Loading