Skip to content

Commit 21667c7

Browse files
committed
Keep categories with max summed values when top k is set
Signed-off-by: Yuanchun Shen <[email protected]>
1 parent 96e5303 commit 21667c7

File tree

14 files changed

+150
-124
lines changed

14 files changed

+150
-124
lines changed

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1131,15 +1131,16 @@ private void visitAggregation(
11311131
UnresolvedExpression span = node.getSpan();
11321132
if (Objects.nonNull(span)) {
11331133
groupExprList.add(span);
1134-
if (getTimeSpanField(span).isPresent()){
1135-
nonNullGroupMask.set(0);
1134+
if (getTimeSpanField(span).isPresent()) {
1135+
nonNullGroupMask.set(0);
11361136
}
11371137
}
11381138
groupExprList.addAll(node.getGroupExprList());
11391139

11401140
// add stats hint to LogicalAggregation
11411141
boolean toAddHintsOnAggregate =
1142-
nonNullGroupMask.nextClearBit(0) >= groupExprList.size() // This checks if all group-bys are nonnull
1142+
nonNullGroupMask.nextClearBit(0)
1143+
>= groupExprList.size() // This checks if all group-bys should be nonnull
11431144
&& !groupExprList.isEmpty()
11441145
&& !(groupExprList.size() == 1 && getTimeSpanField(span).isPresent());
11451146
// add isNotNull filter before aggregation for non-nullable buckets
@@ -2413,10 +2414,6 @@ public RelNode visitChart(Chart node, CalcitePlanContext context) {
24132414
}
24142415
visitAggregation(aggregation, context, false, nonNullGroupMask);
24152416
RelBuilder relBuilder = context.relBuilder;
2416-
String columnSplitName =
2417-
relBuilder.peek().getRowType().getFieldNames().size() > 2
2418-
? relBuilder.peek().getRowType().getFieldNames().get(1)
2419-
: null;
24202417

24212418
// If row or column split does not present or limit equals 0, this is the same as `stats agg
24222419
// [group by col]` because all truncating is performed on the column split
@@ -2457,16 +2454,13 @@ public RelNode visitChart(Chart node, CalcitePlanContext context) {
24572454
final String GRAND_TOTAL_COL = "__grand_total__";
24582455
relBuilder.aggregate(
24592456
relBuilder.groupKey(relBuilder.field(0)),
2460-
buildAggCall(context.relBuilder, aggFunction, relBuilder.field(1))
2461-
.as(GRAND_TOTAL_COL)); // results: group key, agg calls
2457+
// Top-K semantic: Retain categories whose summed values are among the greatest
2458+
relBuilder.sum(relBuilder.field(1)).as(GRAND_TOTAL_COL)); // results: group key, agg calls
24622459
RexNode grandTotal = relBuilder.field(GRAND_TOTAL_COL);
2463-
// Apply sorting: for MIN/EARLIEST, reverse the top/bottom logic
2464-
boolean smallestFirst =
2465-
aggFunction == BuiltinFunctionName.MIN || aggFunction == BuiltinFunctionName.EARLIEST;
2466-
if (config.top != smallestFirst) {
2460+
// Apply sorting: keep the max values if top is set
2461+
if (config.top) {
24672462
grandTotal = relBuilder.desc(grandTotal);
24682463
}
2469-
24702464
// Always set it to null last so that it does not interfere with top / bottom calculation
24712465
grandTotal = relBuilder.nullsLast(grandTotal);
24722466
RexNode rowNum =
@@ -2518,6 +2512,7 @@ public RelNode visitChart(Chart node, CalcitePlanContext context) {
25182512
relBuilder.literal(config.otherStr));
25192513
}
25202514

2515+
String columnSplitName = ((Alias) node.getColumnSplit()).getName();
25212516
String aggFieldName = relBuilder.peek().getRowType().getFieldNames().get(2);
25222517
relBuilder.project(
25232518
relBuilder.field(0),

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteChartCommandIT.java

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import static org.opensearch.sql.util.MatcherUtils.rows;
1414
import static org.opensearch.sql.util.MatcherUtils.schema;
1515
import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
16+
import static org.opensearch.sql.util.MatcherUtils.verifyDataRowsInOrder;
1617
import static org.opensearch.sql.util.MatcherUtils.verifySchema;
1718

1819
import java.io.IOException;
@@ -221,10 +222,11 @@ public void testChartLimitTopWithUseOther() throws IOException {
221222
schema("max(severityNumber)", "bigint"));
222223
verifyDataRows(
223224
result,
224-
rows(1, "max_among_other", 17),
225-
rows(0, "max_among_other", 22),
226-
rows(0, "FATAL3", 23),
227-
rows(0, "FATAL4", 24));
225+
rows(0, "ERROR", 17),
226+
rows(0, "FATAL4", 24),
227+
rows(0, "max_among_other", 23),
228+
rows(1, "ERROR", 17),
229+
rows(1, "max_among_other", 9));
228230
}
229231

230232
@Test
@@ -255,12 +257,13 @@ public void testChartLimitTopWithMinAgg() throws IOException {
255257
schema("flags", "bigint"),
256258
schema("severityText", "string"),
257259
schema("min(severityNumber)", "bigint"));
258-
verifyDataRows(
260+
verifyDataRowsInOrder(
259261
result,
260-
rows(1, "OTHER", 9),
261-
rows(1, "TRACE", 1),
262-
rows(0, "OTHER", 3),
263-
rows(0, "TRACE2", 2));
262+
rows(0, "ERROR", 17),
263+
rows(0, "FATAL4", 24),
264+
rows(0, "OTHER", 2),
265+
rows(1, "ERROR", 17),
266+
rows(1, "OTHER", 1));
264267
}
265268

266269
@Test
@@ -316,19 +319,17 @@ public void testChartUseNullFalseWithNullStr() throws IOException {
316319
}
317320

318321
@Test
319-
public void testChartNullsInRowSplitShouldBeIgnored() throws IOException {
320-
JSONObject result =
321-
executeQuery(
322-
"source=events_null | chart min(cpu_usage) by host region");
323-
verifySchema(
324-
result,
325-
schema("host", "string"),
326-
schema("region", "string"),
327-
schema("min(cpu_usage)", "double"));
328-
verifyDataRows(
329-
result,
330-
rows("db-01", "eu-west", 42.1),
331-
rows("web-01", "us-east", 45.2),
332-
rows("web-02", "us-west", 38.7));
322+
public void testChartNullsInRowSplitShouldBeIgnored() throws IOException {
323+
JSONObject result = executeQuery("source=events_null | chart min(cpu_usage) by host region");
324+
verifySchema(
325+
result,
326+
schema("host", "string"),
327+
schema("region", "string"),
328+
schema("min(cpu_usage)", "double"));
329+
verifyDataRows(
330+
result,
331+
rows("db-01", "eu-west", 42.1),
332+
rows("web-01", "us-east", 45.2),
333+
rows("web-02", "us-west", 38.7));
333334
}
334335
}

integ-test/src/test/resources/expectedOutput/calcite/explain_chart_multiple_group_keys.yaml

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ calcite:
1111
LogicalFilter(condition=[IS NOT NULL($4)])
1212
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])
1313
LogicalProject(age=[$0], __grand_total__=[$1], _row_number_chart_=[ROW_NUMBER() OVER (ORDER BY $1 DESC NULLS LAST)])
14-
LogicalAggregate(group=[{0}], __grand_total__=[AVG($1)])
14+
LogicalAggregate(group=[{0}], __grand_total__=[SUM($1)])
1515
LogicalFilter(condition=[IS NOT NULL($0)])
1616
LogicalProject(age=[SAFE_CAST($1)], avg(balance)=[$2])
1717
LogicalAggregate(group=[{0, 1}], avg(balance)=[AVG($2)])
@@ -31,7 +31,6 @@ calcite:
3131
EnumerableSort(sort0=[$0], dir0=[ASC])
3232
EnumerableCalc(expr#0..2=[{inputs}], age=[$t0], $1=[$t2])
3333
EnumerableWindow(window#0=[window(order by [1 DESC-nulls-last] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])
34-
EnumerableCalc(expr#0..2=[{inputs}], expr#3=[0], expr#4=[=($t2, $t3)], expr#5=[null:DOUBLE], expr#6=[CASE($t4, $t5, $t1)], expr#7=[/($t6, $t2)], age=[$t0], __grand_total__=[$t7])
35-
EnumerableAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[COUNT($1)])
36-
EnumerableCalc(expr#0..1=[{inputs}], expr#2=[SAFE_CAST($t0)], expr#3=[IS NOT NULL($t2)], age=[$t2], avg(balance)=[$t1], $condition=[$t3])
37-
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[gender, balance, age], FILTER->IS NOT NULL($0), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},avg(balance)=AVG($2)), PROJECT->[age, avg(balance)]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"exists":{"field":"gender","boost":1.0}},"_source":{"includes":["gender","balance","age"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"gender":{"terms":{"field":"gender.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}},{"age":{"terms":{"field":"age","missing_bucket":true,"missing_order":"first","order":"asc"}}}]},"aggregations":{"avg(balance)":{"avg":{"field":"balance"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
34+
EnumerableAggregate(group=[{0}], __grand_total__=[SUM($1)])
35+
EnumerableCalc(expr#0..1=[{inputs}], expr#2=[SAFE_CAST($t0)], expr#3=[IS NOT NULL($t2)], age=[$t2], avg(balance)=[$t1], $condition=[$t3])
36+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[gender, balance, age], FILTER->IS NOT NULL($0), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},avg(balance)=AVG($2)), PROJECT->[age, avg(balance)]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"exists":{"field":"gender","boost":1.0}},"_source":{"includes":["gender","balance","age"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"gender":{"terms":{"field":"gender.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}},{"age":{"terms":{"field":"age","missing_bucket":true,"missing_order":"first","order":"asc"}}}]},"aggregations":{"avg(balance)":{"avg":{"field":"balance"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

integ-test/src/test/resources/expectedOutput/calcite/explain_chart_null_str.yaml

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ calcite:
1111
LogicalFilter(condition=[IS NOT NULL($4)])
1212
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank_with_null_values]])
1313
LogicalProject(age=[$0], __grand_total__=[$1], _row_number_chart_=[ROW_NUMBER() OVER (ORDER BY $1 DESC NULLS LAST)])
14-
LogicalAggregate(group=[{0}], __grand_total__=[AVG($1)])
14+
LogicalAggregate(group=[{0}], __grand_total__=[SUM($1)])
1515
LogicalFilter(condition=[IS NOT NULL($0)])
1616
LogicalProject(age=[SAFE_CAST($1)], avg(balance)=[$2])
1717
LogicalAggregate(group=[{0, 2}], avg(balance)=[AVG($1)])
@@ -33,9 +33,8 @@ calcite:
3333
EnumerableSort(sort0=[$0], dir0=[ASC])
3434
EnumerableCalc(expr#0..2=[{inputs}], age=[$t0], $1=[$t2])
3535
EnumerableWindow(window#0=[window(order by [1 DESC-nulls-last] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])
36-
EnumerableCalc(expr#0..2=[{inputs}], expr#3=[0], expr#4=[=($t2, $t3)], expr#5=[null:DOUBLE], expr#6=[CASE($t4, $t5, $t1)], expr#7=[/($t6, $t2)], age=[$t0], __grand_total__=[$t7])
37-
EnumerableAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[COUNT($1)])
38-
EnumerableCalc(expr#0..3=[{inputs}], expr#4=[SAFE_CAST($t1)], expr#5=[0], expr#6=[=($t3, $t5)], expr#7=[null:BIGINT], expr#8=[CASE($t6, $t7, $t2)], expr#9=[CAST($t8):DOUBLE], expr#10=[/($t9, $t3)], expr#11=[IS NOT NULL($t4)], age=[$t4], avg(balance)=[$t10], $condition=[$t11])
39-
EnumerableAggregate(group=[{0, 2}], agg#0=[$SUM0($1)], agg#1=[COUNT($1)])
40-
EnumerableCalc(expr#0..2=[{inputs}], expr#3=[10], expr#4=[null:NULL], expr#5=[SPAN($t2, $t3, $t4)], gender=[$t1], balance=[$t0], age0=[$t5])
41-
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank_with_null_values]], PushDownContext=[[PROJECT->[balance, gender, age], FILTER->IS NOT NULL($1)], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","query":{"exists":{"field":"gender","boost":1.0}},"_source":{"includes":["balance","gender","age"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
36+
EnumerableAggregate(group=[{0}], __grand_total__=[SUM($1)])
37+
EnumerableCalc(expr#0..3=[{inputs}], expr#4=[SAFE_CAST($t1)], expr#5=[0], expr#6=[=($t3, $t5)], expr#7=[null:BIGINT], expr#8=[CASE($t6, $t7, $t2)], expr#9=[CAST($t8):DOUBLE], expr#10=[/($t9, $t3)], expr#11=[IS NOT NULL($t4)], age=[$t4], avg(balance)=[$t10], $condition=[$t11])
38+
EnumerableAggregate(group=[{0, 2}], agg#0=[$SUM0($1)], agg#1=[COUNT($1)])
39+
EnumerableCalc(expr#0..2=[{inputs}], expr#3=[10], expr#4=[null:NULL], expr#5=[SPAN($t2, $t3, $t4)], gender=[$t1], balance=[$t0], age0=[$t5])
40+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank_with_null_values]], PushDownContext=[[PROJECT->[balance, gender, age], FILTER->IS NOT NULL($1)], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","query":{"exists":{"field":"gender","boost":1.0}},"_source":{"includes":["balance","gender","age"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

integ-test/src/test/resources/expectedOutput/calcite/explain_chart_timestamp_span.yaml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ calcite:
1111
LogicalFilter(condition=[IS NOT NULL($3)])
1212
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]])
1313
LogicalProject(category=[$0], __grand_total__=[$1], _row_number_chart_=[ROW_NUMBER() OVER (ORDER BY $1 DESC NULLS LAST)])
14-
LogicalAggregate(group=[{0}], __grand_total__=[MAX($1)])
14+
LogicalAggregate(group=[{0}], __grand_total__=[SUM($1)])
1515
LogicalFilter(condition=[IS NOT NULL($0)])
1616
LogicalProject(category=[$0], max(value)=[$2])
1717
LogicalAggregate(group=[{0, 2}], max(value)=[MAX($1)])
@@ -28,4 +28,5 @@ calcite:
2828
EnumerableSort(sort0=[$0], dir0=[ASC])
2929
EnumerableCalc(expr#0..2=[{inputs}], category=[$t0], $1=[$t2])
3030
EnumerableWindow(window#0=[window(order by [1 DESC-nulls-last] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])
31-
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]], PushDownContext=[[PROJECT->[category, value, timestamp], FILTER->IS NOT NULL($2), FILTER->IS NOT NULL($0), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},__grand_total__=MAX($1))], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"filter":[{"exists":{"field":"timestamp","boost":1.0}},{"exists":{"field":"category","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["category","value","timestamp"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"category":{"terms":{"field":"category","missing_bucket":true,"missing_order":"first","order":"asc"}}}]},"aggregations":{"__grand_total__":{"max":{"field":"value"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
31+
EnumerableAggregate(group=[{0}], __grand_total__=[SUM($1)])
32+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]], PushDownContext=[[PROJECT->[category, value, timestamp], FILTER->IS NOT NULL($2), FILTER->IS NOT NULL($0), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 2},max(value)=MAX($1)), PROJECT->[category, max(value)]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"filter":[{"exists":{"field":"timestamp","boost":1.0}},{"exists":{"field":"category","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["category","value","timestamp"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"category":{"terms":{"field":"category","missing_bucket":true,"missing_order":"first","order":"asc"}}},{"timestamp0":{"date_histogram":{"field":"timestamp","missing_bucket":false,"order":"asc","calendar_interval":"1w"}}}]},"aggregations":{"max(value)":{"max":{"field":"value"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

integ-test/src/test/resources/expectedOutput/calcite/explain_chart_use_other.yaml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ calcite:
1010
LogicalFilter(condition=[IS NOT NULL($23)])
1111
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_otel_logs]])
1212
LogicalProject(severityText=[$0], __grand_total__=[$1], _row_number_chart_=[ROW_NUMBER() OVER (ORDER BY $1 DESC NULLS LAST)])
13-
LogicalAggregate(group=[{0}], __grand_total__=[MAX($1)])
13+
LogicalAggregate(group=[{0}], __grand_total__=[SUM($1)])
1414
LogicalFilter(condition=[IS NOT NULL($0)])
1515
LogicalProject(severityText=[$1], max(severityNumber)=[$2])
1616
LogicalAggregate(group=[{0, 1}], max(severityNumber)=[MAX($2)])
@@ -27,4 +27,5 @@ calcite:
2727
EnumerableSort(sort0=[$0], dir0=[ASC])
2828
EnumerableCalc(expr#0..2=[{inputs}], severityText=[$t0], $1=[$t2])
2929
EnumerableWindow(window#0=[window(order by [1 DESC-nulls-last] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])
30-
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_otel_logs]], PushDownContext=[[PROJECT->[severityText, flags, severityNumber], FILTER->IS NOT NULL($1), FILTER->IS NOT NULL($0), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={1},__grand_total__=MAX($2))], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"filter":[{"exists":{"field":"flags","boost":1.0}},{"exists":{"field":"severityText","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["severityText","flags","severityNumber"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"severityText":{"terms":{"field":"severityText","missing_bucket":true,"missing_order":"first","order":"asc"}}}]},"aggregations":{"__grand_total__":{"max":{"field":"severityNumber"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
30+
EnumerableAggregate(group=[{0}], __grand_total__=[SUM($1)])
31+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_otel_logs]], PushDownContext=[[PROJECT->[severityText, flags, severityNumber], FILTER->IS NOT NULL($1), FILTER->IS NOT NULL($0), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},max(severityNumber)=MAX($2)), PROJECT->[severityText, max(severityNumber)]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"filter":[{"exists":{"field":"flags","boost":1.0}},{"exists":{"field":"severityText","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["severityText","flags","severityNumber"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"flags":{"terms":{"field":"flags","missing_bucket":true,"missing_order":"first","order":"asc"}}},{"severityText":{"terms":{"field":"severityText","missing_bucket":true,"missing_order":"first","order":"asc"}}}]},"aggregations":{"max(severityNumber)":{"max":{"field":"severityNumber"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

0 commit comments

Comments
 (0)