-
Notifications
You must be signed in to change notification settings - Fork 370
[server] Push partition filtering to the server-side to reduce network bandwidth usage #1507
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[server] Push partition filtering to the server-side to reduce network bandwidth usage #1507
Conversation
…raffic [server] Push partition filtering to server-side for reduced network traffic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @xiaochen-zhou for the contribution. I have some comments for the PR.
Please create an issue and reach consensus on the approach before submitting a pull request.
While this PR does enhance the code, I'm not sure it will significantly reduce network load in practice. The listPartitions call is made every 10 seconds, and typically there are fewer than 100 partitions in total — so the payload size is relatively small.
More importantly, the main bottleneck for this RPC lies in ZooKeeper performance, and since the partition filtering isn't pushed down to the ZooKeeper layer, the server still retrieves all partition data before applying the filter. As a result, we don’t get the expected reduction in backend load.
@@ -263,8 +264,13 @@ private List<SourceSplitBase> initNonPartitionedSplits() { | |||
|
|||
private Set<PartitionInfo> listPartitions() { | |||
try { | |||
List<PartitionInfo> partitionInfos = flussAdmin.listPartitionInfos(tablePath).get(); | |||
partitionInfos = applyPartitionFilter(partitionInfos); | |||
PartitionSpec partitionSpec = convertFiltersToPartitionSpec(partitionFilters); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can convert the filter to partialPartitionSpec
during the construction of FlinkSourceEnumerator
to avoid the repeated conversion for listPartitions
.
Thanks for your review. updated the code. I will create an issue. |
I think we can optimize on the ZK side: when the ResolvedPartitionSpec is a complete partition spec, we can directly retrieve the partition instead of scanning all partitions. This can satisfy some scenarios. |
f22ca0f
to
0cf1be5
Compare
Purpose
Push partition filtering to the server-side to minimize network bandwidth usage.
Brief change log
Tests
API and Format
Documentation