Skip to content

Commit f22ca0f

Browse files
committed
ordered partition keys
1 parent 32e0ee7 commit f22ca0f

File tree

1 file changed

+18
-9
lines changed

1 file changed

+18
-9
lines changed

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/util/CommonRpcMessageUtils.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -240,21 +240,30 @@ public static ResolvedPartitionSpec toResolvedPartitionSpec(PbPartitionSpec pbPa
240240
public static ResolvedPartitionSpec toResolvedPartitionSpec(
241241
PbPartitionSpec pbPartitionSpec, List<String> orderedPartitionKeys) {
242242

243-
Map<String, String> keyValueMap = new HashMap<>();
244-
for (PbKeyValue pbKeyValue : pbPartitionSpec.getPartitionKeyValuesList()) {
245-
keyValueMap.put(pbKeyValue.getKey(), pbKeyValue.getValue());
243+
List<PbKeyValue> partitionKeyValuesList = pbPartitionSpec.getPartitionKeyValuesList();
244+
if (partitionKeyValuesList.size() != orderedPartitionKeys.size()) {
245+
return toResolvedPartitionSpec(pbPartitionSpec);
246+
}
247+
248+
Map<String, String> pbkeyValueMap = new HashMap<>();
249+
for (PbKeyValue pbKeyValue : partitionKeyValuesList) {
250+
if (!orderedPartitionKeys.contains(pbKeyValue.getKey())) {
251+
return toResolvedPartitionSpec(pbPartitionSpec);
252+
}
253+
pbkeyValueMap.put(pbKeyValue.getKey(), pbKeyValue.getValue());
246254
}
247-
List<String> presentKeys = new ArrayList<>();
248-
List<String> orderedValues = new ArrayList<>();
255+
256+
List<String> partitionKeys = new ArrayList<>();
257+
List<String> orderedPartitionValues = new ArrayList<>();
249258

250259
for (String orderedKey : orderedPartitionKeys) {
251-
String value = keyValueMap.get(orderedKey);
260+
String value = pbkeyValueMap.get(orderedKey);
252261
if (value != null) {
253-
presentKeys.add(orderedKey);
254-
orderedValues.add(value);
262+
partitionKeys.add(orderedKey);
263+
orderedPartitionValues.add(value);
255264
}
256265
}
257266

258-
return new ResolvedPartitionSpec(presentKeys, orderedValues);
267+
return new ResolvedPartitionSpec(partitionKeys, orderedPartitionValues);
259268
}
260269
}

0 commit comments

Comments
 (0)