Skip to content

Commit a86a5a7

Browse files
authored
Allow renaming group-by fields to existing field names (#4586)
* Rename fields to intended ones after aggregation Signed-off-by: Yuanchun Shen <[email protected]> * Add a defense check Signed-off-by: Yuanchun Shen <[email protected]> * Remove defense check Signed-off-by: Yuanchun Shen <[email protected]> * Handle cases where there exist duplicated group keys Signed-off-by: Yuanchun Shen <[email protected]> --------- Signed-off-by: Yuanchun Shen <[email protected]>
1 parent 05d6594 commit a86a5a7

File tree

2 files changed

+143
-1
lines changed

2 files changed

+143
-1
lines changed

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1000,12 +1000,56 @@ private Pair<List<RexNode>, List<AggCall>> aggregateWithTrimming(
10001000
Pair<List<RexNode>, List<AggCall>> reResolved =
10011001
resolveAttributesForAggregation(groupExprList, aggExprList, context);
10021002

1003+
List<String> intendedGroupKeyAliases = getGroupKeyNamesAfterAggregation(reResolved.getLeft());
10031004
context.relBuilder.aggregate(
10041005
context.relBuilder.groupKey(reResolved.getLeft()), reResolved.getRight());
1006+
// During aggregation, Calcite projects both input dependencies and output group-by fields.
1007+
// When names conflict, Calcite adds numeric suffixes (e.g., "value0").
1008+
// Apply explicit renaming to restore the intended aliases.
1009+
context.relBuilder.rename(intendedGroupKeyAliases);
10051010

10061011
return Pair.of(reResolved.getLeft(), reResolved.getRight());
10071012
}
10081013

1014+
/**
1015+
* Imitates {@code Registrar.registerExpression} of {@link RelBuilder} to derive the output order
1016+
* of group-by keys after aggregation.
1017+
*
1018+
* <p>The projected input reference comes first, while any other computed expression follows.
1019+
*/
1020+
private List<String> getGroupKeyNamesAfterAggregation(List<RexNode> nodes) {
1021+
List<RexNode> reordered = new ArrayList<>();
1022+
List<RexNode> left = new ArrayList<>();
1023+
for (RexNode n : nodes) {
1024+
// The same group-key won't be added twice
1025+
if (reordered.contains(n) || left.contains(n)) {
1026+
continue;
1027+
}
1028+
if (isInputRef(n)) {
1029+
reordered.add(n);
1030+
} else {
1031+
left.add(n);
1032+
}
1033+
}
1034+
reordered.addAll(left);
1035+
return reordered.stream()
1036+
.map(this::extractAliasLiteral)
1037+
.flatMap(Optional::stream)
1038+
.map(RexLiteral::stringValue)
1039+
.toList();
1040+
}
1041+
1042+
/** Whether a rex node is an aliased input reference */
1043+
private boolean isInputRef(RexNode node) {
1044+
return switch (node.getKind()) {
1045+
case AS, DESCENDING, NULLS_FIRST, NULLS_LAST -> {
1046+
final List<RexNode> operands = ((RexCall) node).operands;
1047+
yield isInputRef(operands.getFirst());
1048+
}
1049+
default -> node instanceof RexInputRef;
1050+
};
1051+
}
1052+
10091053
/**
10101054
* Resolve attributes for aggregation.
10111055
*
@@ -1104,7 +1148,7 @@ public RelNode visitAggregation(Aggregation node, CalcitePlanContext context) {
11041148
aggregationAttributes.getLeft().stream()
11051149
.map(this::extractAliasLiteral)
11061150
.flatMap(Optional::stream)
1107-
.map(ref -> ((RexLiteral) ref).getValueAs(String.class))
1151+
.map(ref -> ref.getValueAs(String.class))
11081152
.map(context.relBuilder::field)
11091153
.map(f -> (RexNode) f)
11101154
.toList();
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
setup:
2+
- do:
3+
indices.create:
4+
index: time_test
5+
- do:
6+
query.settings:
7+
body:
8+
transient:
9+
plugins.calcite.enabled : true
10+
11+
- do:
12+
bulk:
13+
refresh: true
14+
body:
15+
- '{"index": {"_index": "time_test"}}'
16+
- '{"category":"A","value":1000,"@timestamp":"2024-01-01T00:00:00Z"}'
17+
- '{"index": {"_index": "time_test"}}'
18+
- '{"category":"B","value":2000,"@timestamp":"2024-01-01T00:05:00Z"}'
19+
- '{"index": {"_index": "time_test"}}'
20+
- '{"category":"A","value":1500,"@timestamp":"2024-01-01T00:10:00Z"}'
21+
- '{"index": {"_index": "time_test"}}'
22+
- '{"category":"C","value":3000,"@timestamp":"2024-01-01T00:20:00Z"}'
23+
24+
---
25+
teardown:
26+
- do:
27+
query.settings:
28+
body:
29+
transient:
30+
plugins.calcite.enabled : false
31+
32+
---
33+
"Test span aggregation with field name collision - basic case":
34+
- skip:
35+
features:
36+
- headers
37+
- allowed_warnings
38+
- do:
39+
headers:
40+
Content-Type: 'application/json'
41+
ppl:
42+
body:
43+
query: source=time_test | stats count() by span(value, 1000) as value
44+
45+
- match: { total: 3 }
46+
- match: { schema: [{"name": "count()", "type": "bigint"}, {"name": "value", "type": "bigint"}] }
47+
- match: { datarows: [[2, 1000], [1, 2000], [1, 3000]] }
48+
49+
---
50+
"Test span aggregation with field name collision - multiple aggregations":
51+
- skip:
52+
features:
53+
- headers
54+
- allowed_warnings
55+
- do:
56+
headers:
57+
Content-Type: 'application/json'
58+
ppl:
59+
body:
60+
query: source=time_test | stats count(), avg(value) by span(value, 1000) as value
61+
62+
- match: { total: 3 }
63+
- match: { schema: [{"name": "count()", "type": "bigint"}, {"name": "avg(value)", "type": "double"}, {"name": "value", "type": "bigint"}] }
64+
- match: { datarows: [[2, 1250.0, 1000], [1, 2000.0, 2000], [1, 3000.0, 3000]] }
65+
66+
---
67+
"Test span aggregation without name collision - multiple group-by":
68+
- skip:
69+
features:
70+
- headers
71+
- allowed_warnings
72+
- do:
73+
headers:
74+
Content-Type: 'application/json'
75+
ppl:
76+
body:
77+
query: source=time_test | stats count() by span(@timestamp, 10min) as @timestamp, category, value
78+
79+
- match: { total: 4 }
80+
- match: { schema: [{"name": "count()", "type": "bigint"}, {"name": "@timestamp", "type": "timestamp"}, {"name": "category", "type": "string"}, {"name": "value", "type": "bigint"}] }
81+
- match: { datarows: [[1, "2024-01-01 00:00:00", "A", 1000], [1, "2024-01-01 00:10:00", "A", 1500], [1, "2024-01-01 00:00:00", "B", 2000], [1, "2024-01-01 00:20:00", "C", 3000]] }
82+
83+
---
84+
"Test span aggregation with duplicated group keys":
85+
- skip:
86+
features:
87+
- headers
88+
- allowed_warnings
89+
- do:
90+
headers:
91+
Content-Type: 'application/json'
92+
ppl:
93+
body:
94+
query: source=time_test | stats count() by value, value, span(@timestamp, 10min) as @timestamp
95+
96+
- match: { total: 4 }
97+
- match: { schema: [{"name": "count()", "type": "bigint"}, {"name": "@timestamp", "type": "timestamp"}, {"name": "value", "type": "bigint"}, {"name": "value0", "type": "bigint"}] }
98+
- match: { datarows: [[1, "2024-01-01 00:00:00", 1000, 1000], [1, "2024-01-01 00:10:00", 1500, 1500], [1, "2024-01-01 00:00:00", 2000, 2000], [1, "2024-01-01 00:20:00", 3000, 3000]] }

0 commit comments

Comments
 (0)