Skip to content

[server] Push partition filtering to the server-side to reduce network bandwidth usage #1507

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.alibaba.fluss.flink.source.state.SourceEnumeratorState;
import com.alibaba.fluss.flink.utils.PushdownUtils.FieldEqual;
import com.alibaba.fluss.metadata.PartitionInfo;
import com.alibaba.fluss.metadata.PartitionSpec;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TableInfo;
import com.alibaba.fluss.metadata.TablePath;
Expand Down Expand Up @@ -130,6 +131,8 @@ public class FlinkSourceEnumerator

private final List<FieldEqual> partitionFilters;

private PartitionSpec filterPartitionSpec;

public FlinkSourceEnumerator(
TablePath tablePath,
Configuration flussConf,
Expand Down Expand Up @@ -190,6 +193,7 @@ public void start() {
bucketOffsetsRetriever = new BucketOffsetsRetrieverImpl(flussAdmin, tablePath);
try {
tableInfo = flussAdmin.getTableInfo(tablePath).get();
filterPartitionSpec = convertFiltersToPartitionSpec(partitionFilters);
lakeEnabled = tableInfo.getTableConfig().isDataLakeEnabled();
} catch (Exception e) {
throw new FlinkRuntimeException(
Expand Down Expand Up @@ -263,8 +267,12 @@ private List<SourceSplitBase> initNonPartitionedSplits() {

private Set<PartitionInfo> listPartitions() {
try {
List<PartitionInfo> partitionInfos = flussAdmin.listPartitionInfos(tablePath).get();
partitionInfos = applyPartitionFilter(partitionInfos);
if (filterPartitionSpec == null && !partitionFilters.isEmpty()) {
// Contradictory conditions, no partitions can satisfy
return new HashSet<>();
}
List<PartitionInfo> partitionInfos =
flussAdmin.listPartitionInfos(tablePath, filterPartitionSpec).get();
return new HashSet<>(partitionInfos);
} catch (Exception e) {
throw new FlinkRuntimeException(
Expand All @@ -273,32 +281,27 @@ private Set<PartitionInfo> listPartitions() {
}
}

/** Apply partition filter. */
private List<PartitionInfo> applyPartitionFilter(List<PartitionInfo> partitionInfos) {
if (!partitionFilters.isEmpty()) {
return partitionInfos.stream()
.filter(
partitionInfo -> {
Map<String, String> specMap =
partitionInfo.getPartitionSpec().getSpecMap();
// use getFields() instead of getFieldNames() to
// avoid collection construction
List<DataField> fields = tableInfo.getRowType().getFields();
for (FieldEqual filter : partitionFilters) {
String fieldName = fields.get(filter.fieldIndex).getName();
String partitionValue = specMap.get(fieldName);
if (partitionValue == null
|| !filter.equalValue
.toString()
.equals(partitionValue)) {
return false;
}
}
return true;
})
.collect(Collectors.toList());
}
return partitionInfos;
/** Convert partition filters to PartitionSpec for server-side filtering. */
@Nullable
private PartitionSpec convertFiltersToPartitionSpec(List<FieldEqual> partitionFilters) {
if (partitionFilters.isEmpty()) {
return null;
}
// use getFields() instead of getFieldNames() to avoid collection construction
List<DataField> fields = tableInfo.getRowType().getFields();
Map<String, String> specMap = new HashMap<>();

for (FieldEqual filter : partitionFilters) {
String fieldName = fields.get(filter.fieldIndex).getName();
String currValue = filter.equalValue.toString();

String existingValue = specMap.putIfAbsent(fieldName, currValue);
if (existingValue != null && !existingValue.equals(currValue)) {
return null;
}
}

return new PartitionSpec(specMap);
}

/** Init the splits for Fluss. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -230,4 +232,38 @@ public static ResolvedPartitionSpec toResolvedPartitionSpec(PbPartitionSpec pbPa
}
return new ResolvedPartitionSpec(partitionKeys, partitionValues);
}

/**
* Convert PbPartitionSpec to ResolvedPartitionSpec with partition key ordering. This ensures
* the partition values are ordered according to the table's partition key sequence.
*/
public static ResolvedPartitionSpec toResolvedPartitionSpec(
PbPartitionSpec pbPartitionSpec, List<String> orderedPartitionKeys) {

List<PbKeyValue> partitionKeyValuesList = pbPartitionSpec.getPartitionKeyValuesList();
if (partitionKeyValuesList.size() != orderedPartitionKeys.size()) {
return toResolvedPartitionSpec(pbPartitionSpec);
}

Map<String, String> pbkeyValueMap = new HashMap<>();
for (PbKeyValue pbKeyValue : partitionKeyValuesList) {
if (!orderedPartitionKeys.contains(pbKeyValue.getKey())) {
return toResolvedPartitionSpec(pbPartitionSpec);
}
pbkeyValueMap.put(pbKeyValue.getKey(), pbKeyValue.getValue());
}

List<String> partitionKeys = new ArrayList<>();
List<String> orderedPartitionValues = new ArrayList<>();

for (String orderedKey : orderedPartitionKeys) {
String value = pbkeyValueMap.get(orderedKey);
if (value != null) {
partitionKeys.add(orderedKey);
orderedPartitionValues.add(value);
}
}

return new ResolvedPartitionSpec(partitionKeys, orderedPartitionValues);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -404,17 +404,18 @@ public CompletableFuture<GetFileSystemSecurityTokenResponse> getFileSystemSecuri
public CompletableFuture<ListPartitionInfosResponse> listPartitionInfos(
ListPartitionInfosRequest request) {
TablePath tablePath = toTablePath(request.getTablePath());
TableInfo tableInfo = metadataManager.getTable(tablePath);
List<String> partitionKeys = tableInfo.getPartitionKeys();

Map<String, Long> partitionNameAndIds;
if (request.hasPartialPartitionSpec()) {
ResolvedPartitionSpec partitionSpecFromRequest =
toResolvedPartitionSpec(request.getPartialPartitionSpec());
toResolvedPartitionSpec(request.getPartialPartitionSpec(), partitionKeys);
partitionNameAndIds =
metadataManager.listPartitions(tablePath, partitionSpecFromRequest);
} else {
partitionNameAndIds = metadataManager.listPartitions(tablePath);
}
TableInfo tableInfo = metadataManager.getTable(tablePath);
List<String> partitionKeys = tableInfo.getPartitionKeys();
return CompletableFuture.completedFuture(
toListPartitionInfosResponse(partitionKeys, partitionNameAndIds));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@
import java.util.OptionalLong;
import java.util.Set;

import static com.alibaba.fluss.metadata.ResolvedPartitionSpec.fromPartitionName;

/**
* This class includes methods for write/read various metadata (leader address, tablet server
* registration, table assignment, table, schema) in Zookeeper.
Expand Down Expand Up @@ -449,14 +447,21 @@ public Map<String, Long> getPartitionNameAndIds(
throws Exception {
Map<String, Long> partitions = new HashMap<>();

for (String partitionName : getPartitions(tablePath)) {
ResolvedPartitionSpec resolvedPartitionSpec =
fromPartitionName(partitionKeys, partitionName);
boolean contains = resolvedPartitionSpec.contains(partialPartitionSpec);
if (contains) {
Optional<TablePartition> optPartition = getPartition(tablePath, partitionName);
optPartition.ifPresent(
partition -> partitions.put(partitionName, partition.getPartitionId()));
if (partitionKeys.size() == partialPartitionSpec.getPartitionKeys().size()) {
String exactPartitionName = partialPartitionSpec.getPartitionName();
Optional<TablePartition> optPartition = getPartition(tablePath, exactPartitionName);
optPartition.ifPresent(
partition -> partitions.put(exactPartitionName, partition.getPartitionId()));
} else {
for (String partitionName : getPartitions(tablePath)) {
ResolvedPartitionSpec resolvedPartitionSpec =
ResolvedPartitionSpec.fromPartitionName(partitionKeys, partitionName);
boolean contains = resolvedPartitionSpec.contains(partialPartitionSpec);
if (contains) {
Optional<TablePartition> optPartition = getPartition(tablePath, partitionName);
optPartition.ifPresent(
partition -> partitions.put(partitionName, partition.getPartitionId()));
}
}
}

Expand Down