Skip to content

Commit 0cf1be5

Browse files
committed
ordered partition keys
1 parent dc35f81 commit 0cf1be5

File tree

2 files changed

+40
-3
lines changed

2 files changed

+40
-3
lines changed

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/util/CommonRpcMessageUtils.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@
4949
import java.nio.ByteBuffer;
5050
import java.util.ArrayList;
5151
import java.util.Collection;
52+
import java.util.HashMap;
5253
import java.util.List;
54+
import java.util.Map;
5355
import java.util.UUID;
5456
import java.util.stream.Collectors;
5557

@@ -230,4 +232,38 @@ public static ResolvedPartitionSpec toResolvedPartitionSpec(PbPartitionSpec pbPa
230232
}
231233
return new ResolvedPartitionSpec(partitionKeys, partitionValues);
232234
}
235+
236+
/**
237+
* Convert PbPartitionSpec to ResolvedPartitionSpec with partition key ordering. This ensures
238+
* the partition values are ordered according to the table's partition key sequence.
239+
*/
240+
public static ResolvedPartitionSpec toResolvedPartitionSpec(
241+
PbPartitionSpec pbPartitionSpec, List<String> orderedPartitionKeys) {
242+
243+
List<PbKeyValue> partitionKeyValuesList = pbPartitionSpec.getPartitionKeyValuesList();
244+
if (partitionKeyValuesList.size() != orderedPartitionKeys.size()) {
245+
return toResolvedPartitionSpec(pbPartitionSpec);
246+
}
247+
248+
Map<String, String> pbkeyValueMap = new HashMap<>();
249+
for (PbKeyValue pbKeyValue : partitionKeyValuesList) {
250+
if (!orderedPartitionKeys.contains(pbKeyValue.getKey())) {
251+
return toResolvedPartitionSpec(pbPartitionSpec);
252+
}
253+
pbkeyValueMap.put(pbKeyValue.getKey(), pbKeyValue.getValue());
254+
}
255+
256+
List<String> partitionKeys = new ArrayList<>();
257+
List<String> orderedPartitionValues = new ArrayList<>();
258+
259+
for (String orderedKey : orderedPartitionKeys) {
260+
String value = pbkeyValueMap.get(orderedKey);
261+
if (value != null) {
262+
partitionKeys.add(orderedKey);
263+
orderedPartitionValues.add(value);
264+
}
265+
}
266+
267+
return new ResolvedPartitionSpec(partitionKeys, orderedPartitionValues);
268+
}
233269
}

fluss-server/src/main/java/com/alibaba/fluss/server/RpcServiceBase.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -404,17 +404,18 @@ public CompletableFuture<GetFileSystemSecurityTokenResponse> getFileSystemSecuri
404404
public CompletableFuture<ListPartitionInfosResponse> listPartitionInfos(
405405
ListPartitionInfosRequest request) {
406406
TablePath tablePath = toTablePath(request.getTablePath());
407+
TableInfo tableInfo = metadataManager.getTable(tablePath);
408+
List<String> partitionKeys = tableInfo.getPartitionKeys();
409+
407410
Map<String, Long> partitionNameAndIds;
408411
if (request.hasPartialPartitionSpec()) {
409412
ResolvedPartitionSpec partitionSpecFromRequest =
410-
toResolvedPartitionSpec(request.getPartialPartitionSpec());
413+
toResolvedPartitionSpec(request.getPartialPartitionSpec(), partitionKeys);
411414
partitionNameAndIds =
412415
metadataManager.listPartitions(tablePath, partitionSpecFromRequest);
413416
} else {
414417
partitionNameAndIds = metadataManager.listPartitions(tablePath);
415418
}
416-
TableInfo tableInfo = metadataManager.getTable(tablePath);
417-
List<String> partitionKeys = tableInfo.getPartitionKeys();
418419
return CompletableFuture.completedFuture(
419420
toListPartitionInfosResponse(partitionKeys, partitionNameAndIds));
420421
}

0 commit comments

Comments
 (0)