Skip to content

Conversation

@robtandy
Copy link
Contributor

@robtandy robtandy commented Feb 4, 2025

@andygrove, per our collaboration around this, here is the requested PR main. Tagging @alamb here also for his additional insight and perspective around query execution strategies, and to follow up from the presentation given at the DataFusion Community Meeting.

TL;DR

This PR contains a pull based stage execution model where stages are connected using ArrowFlight. Its simpler and more performant than previous iterations of the streaming rewrite. It works on TPCH SF100 and below. Above has not been tested, though I think it should parallelize well at the expense of execution overhead.

This may obsolete issue #55, address issue #46, and move toward #2

Evolution of this work

This represents the third iteration of attempts to stream data between stages. A brief accounting of those efforts might be useful to capture here:

  1. Try to use the Ray Object Store to stream batches between stages.
    This was a challenge for two reasons. The first was that under high throughput of potentially small items, the object store added too much latency to query processing. The second and reason this was abandoned is that creating a shuffle writer exec plan node to jump from rust to python to interact with the object store, and potentially itself, call rust, proved difficult to manage and reason about.

  2. The second attempt and one I have discussed on discord, was to adopt ArrowFlight for streaming between stages and flip the execution of the stages from pull to push. The thinking was to have each stage eagerly execute and stream batches to an Exchange Actor which would hold a bunch of channels (num stages x num partitions per stage), and allow subsequent stages to consume from them.

    The problems here were that the Exchange Actor was difficult to tune and created an additional Arrow Flight hop. Another challenge was that DataFusion is inherently a pull based architecture, and very easy to compose and reason about. Flipping this was like swimming upstream and resulted in a lot of complications which DataFusion already elegantly manages.

    While it was interesting to consider push execution, and may inform future work to consume from streams and materialize query results, ultimately, it meant reimplementing a lot of things that DataFusion just makes easy.

  3. The third attempt, and this iteration is purely pull based and uses ArrowFlight to stream between stages. This turned out to produce the smallest amount of code, and one that was easy to work with and debug. Its as if you are executing Datafusion locally, but some of the execution nodes are connected with ArrowFlight instead of channels.

There is more that can be improved performance and ergonomics wise, but this is quite usable as it is, and will allow others to see and collaborate.

For examples, see the main readme, and examples/tips.py and tpch/tpc.py.

Execution Strategy

DataFusion Ray will optimize a query plan and then break it into stages. Those stages will be scheduled as Ray Actors and make their partition streams available over arrow flight.

Connecting one stage to another means adding a RayStageReaderExec node within the stage where a connection is required and it will go get the stream using a FlightClient.

Tunables:

  • ---isolate DataFusion Ray will attempt to host each Stage as its own actor. This flag (in the examples and a parameter to the RayContext) will tell DataFusion Ray to host each partition of each stage as its own Actor. This dramatically increases parallelism, but is a blunt instrument, and a more fine tuned choice (like split a stage into x parts) would be more desirable, and can be added in a future update.
  • --concurrency will control the partitioning count for the all stages and is planned using DataFusion before submitting control to Ray. This interacts with --isolate
  • --batch-size This controls the target (and also max) batch size exchanged between stages. Currently 8192 works for all queries in TPCH SF100. Going higher can produce Flight errors as we exceed the batch payload size.

@robtandy
Copy link
Contributor Author

robtandy commented Feb 5, 2025

Note that the CI is broken, and I suggest we address in subsequent work. There are plenty of housekeeping activities to do post this PR to make for a 0.1.0 release. We can make issues for those after this PR is reviewed and/or accepted.

@milenkovicm
Copy link

one small suggestion related to ---isolate. Would it make sense to rename it to --actor-per-task or --actor-per-task-context or something along those lines.
I believe it would align with common naming (spark/ballista) where partition handling within stage is called task, also it would need less explanation.

@robtandy
Copy link
Contributor Author

robtandy commented Feb 7, 2025

@milenkovicm , That's a good suggestion. While I wait for review, I'm revisiting this functionality to add finer grained control.

Something desirable is to be able to specify the number of workers for the query. If we did this, maybe --workers? This way you can have a predictable resource allocation, worker wise, and having more concurrent queries on the cluster might be more manageable.

I'm not sure yet the best way to do this. Because after you determine the number of stages, how would you best decide which ones are advantageous to split?

@robtandy
Copy link
Contributor Author

robtandy commented Feb 7, 2025

Update to this PR. Added proper logging output configured (for both rust and python) with DATAFUSION_RAY_LOG_LEVEL environment variable. The env var and logging settings are propagated to Ray Workers.

@milenkovicm
Copy link

Something desirable is to be able to specify the number of workers for the query. If we did this, maybe --workers? This way you can have a predictable resource allocation, worker wise, and having more concurrent queries on the cluster might be more manageable.

Would --workers be set per session or overall ray cluster?
I apologise if I give too many references to ballista, but my brain is wired to that concept at the moment.
Overall cluster parallelism in ballista is tied up to sum of executor parallelism. Specific session context parallelism can be set as session configuration parameter.

Would it make sense to use datafusion.execution.target_partitions to control --workers? Ballista had ballista.shuffle.partitions session configuration option which would set datafusion.execution.target_partitions to desired task parallelism.

self.isolate_partitions = isolate_parititions
self.prefetch_buffer_size = prefetch_buffer_size

def stages(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have two comments here:

  • for properties we should use the @property decorator, see suggestion above
  • we can factor out the function __init_stages(self)

i.e. the

    @property
    def stages(self):
          if not self._stages:
               self._stages = self._init_stages()
          return self.stages


def optimized_logical_plan(self):
return self.df.optimized_logical_plan()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@property
def execution_plan(self):
return self.df.execution_plan()
@property
def logical_plan(self):
return self.df.logical_plan()
@property
def optimized_logical_plan(self):
return self.df.optimized_logical_plan()

self.run_stages()

addrs = ray.get(self.coord.get_stage_addrs.remote())

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if len(addrs[last_stage]) != 1:
raise ValueError("Unexpected condition: more than one final stage")


def show(self) -> None:
batches = self.collect()
print(prettify(batches))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this remain a print or be an info?

self.ctx.set(option, value)


@ray.remote(num_cpus=0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why num_cpus=0 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All stages need to be running in order for the results to stream through the distributed plan. Setting num_cpus=0 ensures all Actors will be scheduled by Ray. If we had a different value, Ray may choose to wait for available resources and we, at the moment, do not have a way of knowing a stage is waiting to be scheduled.

I think future PRs will included better specification of the resources required per query.

shadow_partition,
)
except Exception as e:
error(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How if we use a custom type hre?
```raise StageServiceError(self.stage_id, shadow) from e`

from datafusion_ray._datafusion_ray_internal import StageService

self.shadow_partition = shadow_partition
shadow = (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably compute this in the exception handler it's only used for error handling?

@robtandy
Copy link
Contributor Author

I've refactored the control of parallelizing execution to be more fine grained. --partitions-per-worker controls the number of partitions hosted by an actor (who gets an entire Ray Worker). So if the stage has 10 partitions, concurrency=10 and partitions-per-worker=4, we'll spin up 40 actors to satisfy the query.

Latest TPCH100 results, compared with local data fusion using all cores on a 32CPU machine, NVME drive:

{
    "engine": "datafusion-ray",
    "benchmark": "tpch",
    "settings": {
        "concurrency": 16,
        "batch_size": 8192,
        "prefetch_buffer_size": 0,
        "partitions_per_worker": 4
    },
    "data_path": "file:///data2/sf100/",
    "queries": {
        "1": 14.63571572303772,
        "2": 16.11984419822693,
        "3": 20.260254621505737,
        "4": 16.40132737159729,
        "5": 35.4002046585083,
        "6": 7.793532609939575,
        "7": 49.56708884239197,
        "8": 37.00137710571289,
        "9": 60.13660907745361,
        "10": 38.18756365776062,
        "11": 13.499444484710693,
        "12": 18.93906331062317,
        "13": 14.921503782272339,
        "14": 7.416260004043579,
        "15": 2.373532295227051,
        "16": 8.229618549346924,
        "17": 52.57597255706787,
        "18": 85.48271942138672,
        "19": 10.138697862625122,
        "20": 15.182426929473877,
        "21": 78.81208372116089,
        "22": 8.711960792541504
    },
    "local_queries": {
        "1": 14.912381172180176,
        "2": 10.478784322738647,
        "3": 8.960041284561157,
        "4": 3.8824241161346436,
        "5": 15.605360507965088,
        "6": 1.672469139099121,
        "7": 28.076196432113647,
        "8": 14.546991348266602,
        "9": 26.64270520210266,
        "10": 11.699812173843384,
        "11": 4.682126522064209,
        "12": 4.03217339515686,
        "13": 8.454285621643066,
        "14": 2.875070095062256,
        "15": 0.0013363361358642578,
        "16": 2.2461774349212646,
        "17": 26.58483576774597,
        "18": 61.40281629562378,
        "19": 5.444426774978638,
        "20": 7.112048625946045,
        "21": 36.257577657699585,
        "22": 2.517507314682007
    },
    "validated": {
        "1": true,
        "2": true,
        "3": true,
        "4": true,
        "5": true,
        "6": true,
        "7": true,
        "8": true,
        "9": true,
        "10": true,
        "11": true,
        "12": true,
        "13": true,
        "14": true,
        "15": true,
        "16": true,
        "17": true,
        "18": true,
        "19": true,
        "20": true,
        "21": true,
        "22": true
    }
}

@edmondop
Copy link
Contributor

Wow these results are impressive, I am not sure I understand the consequence of thechanges, are you allocating 4x actors compared than before?

@robtandy
Copy link
Contributor Author

Hey thanks! The number of actors created per query is number_of_stages_in_query * concurrency / partitions_per_worker. Or if partitions_per_worker is not set, then the number of actors created is number_of_stages_in_query

@robtandy
Copy link
Contributor Author

Update to PR to accommodate python 's changes to asyncio.wait that occured in version 3.11 and above. Tested with 3.10 and 3.12.

We'll sort out more version testing when we sort out CI

@andygrove
Copy link
Member

Thanks @robtandy. There has been a lot of progress and I agree that it would be good to merge this.

The Kubernetes CI tests are failing, which is not surprising. Do we want to disable these tests?

 buildx failed with: ERROR: resolve : lstat k8s: no such file or directory

@milenkovicm @edmondop Do you have an objections to merging this PR as a checkpoint on progress?

@milenkovicm
Copy link

please do @andygrove, my comment is minor, it should not be considered as blocker in any sense.

@robtandy
Copy link
Contributor Author

robtandy commented Feb 12, 2025

I don't know much yet about setting up CI, if you (@andygrove ) /we can disable, please do. My preference would be land this, and sort out the housekeeping like tasks we need to do in order to get the repo in a solid state with CI and more user facing docs. Then release a version.

@andygrove andygrove merged commit 071802f into apache:main Feb 13, 2025
0 of 2 checks passed
@alamb
Copy link

alamb commented Feb 13, 2025

woohoo!

Copy link

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very cool -- thank you @robtandy @edmondop and @andygrove ❤️

}

fn ipc_to_batch_helper(bytes: &[u8]) -> Result<RecordBatch, ArrowError> {
let mut stream_reader = StreamReader::try_new_buffered(Cursor::new(bytes), None)?;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI since you are reading from memory here anyways I don't think buffering adds much extra value

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, once this is available

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants