Skip to content

Commit 18ab4dc

Browse files
authored
Pushdown case function in aggregations as range queries (opensearch-project#4400)
* WIP: implementing case range analyzer Signed-off-by: Yuanchun Shen <[email protected]> * Correct case analyzer Signed-off-by: Yuanchun Shen <[email protected]> * Create bucket aggregation parsers that supports parsing nested sub aggregations Signed-off-by: Yuanchun Shen <[email protected]> * Fix unit tests Signed-off-by: Yuanchun Shen <[email protected]> * Fix parsers to multi-range cases Signed-off-by: Yuanchun Shen <[email protected]> * Update leaf bucket parser Signed-off-by: Yuanchun Shen <[email protected]> * Unit test case range analyzer Signed-off-by: Yuanchun Shen <[email protected]> * Add explain ITs for pushing down case in aggregations Signed-off-by: Yuanchun Shen <[email protected]> * Update CaseRangeAnalyzerTest Signed-off-by: Yuanchun Shen <[email protected]> * Add a yaml test that replicates issue 4201 Signed-off-by: Yuanchun Shen <[email protected]> * Add integration tests for case in aggregation Signed-off-by: Yuanchun Shen <[email protected]> * Fix unit tests Signed-off-by: Yuanchun Shen <[email protected]> * Add a patch to CalcitePPLCaseFunctionIT Signed-off-by: Yuanchun Shen <[email protected]> * Migrate all composite aggregation parser usage to bucket aggregate parser Signed-off-by: Yuanchun Shen <[email protected]> * Create a parent abstract classes for BucketAggregationParsers Signed-off-by: Yuanchun Shen <[email protected]> * Remove an unnecessary bucket agg in AggregationQueryBuilder Signed-off-by: Yuanchun Shen <[email protected]> * Test pushing down case where there exists null values Signed-off-by: Yuanchun Shen <[email protected]> * Return empty in CaseRangeAnalyzer to unblock the rest pushdown - Additionally test number as result expressions Signed-off-by: Yuanchun Shen <[email protected]> * Document limitations of pushding case as range queries Signed-off-by: Yuanchun Shen <[email protected]> * Make case pushdown a private method Signed-off-by: Yuanchun Shen <[email protected]> * Chores: remove unused helper method Signed-off-by: Yuanchun Shen <[email protected]> * Unify logics for creating nested aggregations Signed-off-by: Yuanchun Shen <[email protected]> * Remove a note in condition.rst Signed-off-by: Yuanchun Shen <[email protected]> * Optmize range aggregation Signed-off-by: Yuanchun Shen <[email protected]> * Ignore testNestedAggregationsExplain when pushdown is disabled Signed-off-by: Yuanchun Shen <[email protected]> * Fix explain ITs after merge Signed-off-by: Yuanchun Shen <[email protected]> --------- Signed-off-by: Yuanchun Shen <[email protected]>
1 parent bdd42bc commit 18ab4dc

File tree

42 files changed

+2211
-143
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+2211
-143
lines changed

core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -967,19 +967,15 @@ void populate() {
967967
XOR,
968968
SqlStdOperatorTable.NOT_EQUALS,
969969
PPLTypeChecker.family(SqlTypeFamily.BOOLEAN, SqlTypeFamily.BOOLEAN));
970-
// SqlStdOperatorTable.CASE.getOperandTypeChecker is null. We manually create a
971-
// type checker
972-
// for it. The second and third operands are required to be of the same type. If
973-
// not,
974-
// it will throw an IllegalArgumentException with information Can't find
975-
// leastRestrictive type
970+
// SqlStdOperatorTable.CASE.getOperandTypeChecker is null. We manually create a type checker
971+
// for it. The second and third operands are required to be of the same type. If not, it will
972+
// throw an IllegalArgumentException with information Can't find leastRestrictive type
976973
registerOperator(
977974
IF,
978975
SqlStdOperatorTable.CASE,
979976
PPLTypeChecker.family(SqlTypeFamily.BOOLEAN, SqlTypeFamily.ANY, SqlTypeFamily.ANY));
980977
// Re-define the type checker for is not null, is present, and is null since
981-
// their original
982-
// type checker ANY isn't compatible with struct types.
978+
// their original type checker ANY isn't compatible with struct types.
983979
registerOperator(
984980
IS_NOT_NULL,
985981
SqlStdOperatorTable.IS_NOT_NULL,

docs/user/ppl/functions/condition.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,14 @@ Argument type: all the supported data type, (NOTE : there is no comma before "el
227227

228228
Return type: any
229229

230+
Limitations
231+
>>>>>>>>>>>
232+
233+
When each condition is a field comparison with a numeric literal and each result expression is a string literal, the query will be optimized as `range aggregations <https://docs.opensearch.org/latest/aggregations/bucket/range>`_ if pushdown optimization is enabled. However, this optimization has the following limitations:
234+
235+
- Null values will not be grouped into any bucket of a range aggregation and will be ignored
236+
- The default ELSE clause will use the string literal ``"null"`` instead of actual NULL values
237+
230238
Example::
231239

232240
os> source=accounts | eval result = case(age > 35, firstname, age < 30, lastname else employer) | fields result, firstname, lastname, age, employer

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 125 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_LOGS;
1111
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_NESTED_SIMPLE;
1212
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_STRINGS;
13+
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_TIME_DATA;
1314
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_WEBLOGS;
1415
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_WORKER;
1516
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_WORK_INFORMATION;
@@ -18,6 +19,7 @@
1819

1920
import java.io.IOException;
2021
import java.util.Locale;
22+
import org.junit.Assume;
2123
import org.junit.Ignore;
2224
import org.junit.Test;
2325
import org.opensearch.sql.ppl.ExplainIT;
@@ -512,22 +514,6 @@ public void testExplainStatsWithSubAggregation() throws IOException {
512514
+ " @timestamp, region"));
513515
}
514516

515-
@Test
516-
public void bucketNullableNotSupportSubAggregation() throws IOException {
517-
// TODO: Don't throw exception after addressing
518-
// https://github.com/opensearch-project/sql/issues/4317
519-
// When bucketNullable is true, sub aggregation is not supported. Hence we cannot pushdown the
520-
// aggregation in this query. Caused by issue
521-
// https://github.com/opensearch-project/sql/issues/4317,
522-
// bin aggregation on timestamp field won't work if not been push down.
523-
enabledOnlyWhenPushdownIsEnabled();
524-
assertThrows(
525-
Exception.class,
526-
() ->
527-
explainQueryToString(
528-
"source=events | bin @timestamp bins=3 | stats count() by @timestamp, region"));
529-
}
530-
531517
@Test
532518
public void testExplainBinWithSpan() throws IOException {
533519
String expected = loadExpectedPlan("explain_bin_span.yaml");
@@ -1169,4 +1155,127 @@ public void testPushDownMinOrMaxAggOnDerivedField() throws IOException {
11691155
+ "| stats MIN(balance2), MAX(balance2)",
11701156
TEST_INDEX_ACCOUNT)));
11711157
}
1158+
1159+
@Test
1160+
public void testCasePushdownAsRangeQueryExplain() throws IOException {
1161+
// CASE 1: Range - Metric
1162+
// 1.1 Range - Metric
1163+
assertYamlEqualsIgnoreId(
1164+
loadExpectedPlan("agg_range_metric_push.yaml"),
1165+
explainQueryYaml(
1166+
String.format(
1167+
"source=%s | eval age_range = case(age < 30, 'u30', age < 40, 'u40' else 'u100') |"
1168+
+ " stats avg(age) as avg_age by age_range",
1169+
TEST_INDEX_BANK)));
1170+
1171+
// 1.2 Range - Metric (COUNT)
1172+
assertYamlEqualsIgnoreId(
1173+
loadExpectedPlan("agg_range_count_push.yaml"),
1174+
explainQueryYaml(
1175+
String.format(
1176+
"source=%s | eval age_range = case(age < 30, 'u30', age >= 30 and age < 40, 'u40'"
1177+
+ " else 'u100') | stats avg(age) by age_range",
1178+
TEST_INDEX_BANK)));
1179+
1180+
// 1.3 Range - Range - Metric
1181+
assertYamlEqualsIgnoreId(
1182+
loadExpectedPlan("agg_range_range_metric_push.yaml"),
1183+
explainQueryYaml(
1184+
String.format(
1185+
"source=%s | eval age_range = case(age < 30, 'u30', age < 40, 'u40' else 'u100'),"
1186+
+ " balance_range = case(balance < 20000, 'medium' else 'high') | stats"
1187+
+ " avg(balance) as avg_balance by age_range, balance_range",
1188+
TEST_INDEX_BANK)));
1189+
1190+
// 1.4 Range - Metric (With null & discontinuous ranges)
1191+
assertYamlEqualsIgnoreId(
1192+
loadExpectedPlan("agg_range_metric_complex_push.yaml"),
1193+
explainQueryYaml(
1194+
String.format(
1195+
"source=%s | eval age_range = case(age < 30, 'u30', (age >= 35 and age < 40) or age"
1196+
+ " >= 80, '30-40 or >=80') | stats avg(balance) by age_range",
1197+
TEST_INDEX_BANK)));
1198+
1199+
// 1.5 Should not be pushed because the range is not closed-open
1200+
assertYamlEqualsIgnoreId(
1201+
loadExpectedPlan("agg_case_cannot_push.yaml"),
1202+
explainQueryYaml(
1203+
String.format(
1204+
"source=%s | eval age_range = case(age < 30, 'u30', age >= 30 and age <= 40, 'u40'"
1205+
+ " else 'u100') | stats avg(age) as avg_age by age_range",
1206+
TEST_INDEX_BANK)));
1207+
1208+
// 1.6 Should not be pushed as range query because the result expression is not a string
1209+
// literal.
1210+
// Range aggregation keys must be strings
1211+
assertYamlEqualsIgnoreId(
1212+
loadExpectedPlan("agg_case_num_res_cannot_push.yaml"),
1213+
explainQueryYaml(
1214+
String.format(
1215+
"source=%s | eval age_range = case(age < 30, 30 else 100) | stats count() by"
1216+
+ " age_range",
1217+
TEST_INDEX_BANK)));
1218+
1219+
// CASE 2: Composite - Range - Metric
1220+
// 2.1 Composite (term) - Range - Metric
1221+
assertYamlEqualsIgnoreId(
1222+
loadExpectedPlan("agg_composite_range_metric_push.yaml"),
1223+
explainQueryYaml(
1224+
String.format(
1225+
"source=%s | eval age_range = case(age < 30, 'u30' else 'a30') | stats avg(balance)"
1226+
+ " by state, age_range",
1227+
TEST_INDEX_BANK)));
1228+
1229+
// 2.2 Composite (date histogram) - Range - Metric
1230+
assertYamlEqualsIgnoreId(
1231+
loadExpectedPlan("agg_composite_date_range_push.yaml"),
1232+
explainQueryYaml(
1233+
"source=opensearch-sql_test_index_time_data | eval value_range = case(value < 7000,"
1234+
+ " 'small' else 'large') | stats avg(value) by value_range, span(@timestamp,"
1235+
+ " 1h)"));
1236+
1237+
// 2.3 Composite(2 fields) - Range - Metric (with count)
1238+
assertYamlEqualsIgnoreId(
1239+
loadExpectedPlan("agg_composite2_range_count_push.yaml"),
1240+
explainQueryYaml(
1241+
String.format(
1242+
"source=%s | eval age_range = case(age < 30, 'u30' else 'a30') | stats"
1243+
+ " avg(balance), count() by age_range, state, gender",
1244+
TEST_INDEX_BANK)));
1245+
1246+
// 2.4 Composite (2 fields) - Range - Range - Metric (with count)
1247+
assertYamlEqualsIgnoreId(
1248+
loadExpectedPlan("agg_composite2_range_range_count_push.yaml"),
1249+
explainQueryYaml(
1250+
String.format(
1251+
"source=%s | eval age_range = case(age < 35, 'u35' else 'a35'), balance_range ="
1252+
+ " case(balance < 20000, 'medium' else 'high') | stats avg(balance) as"
1253+
+ " avg_balance by age_range, balance_range, state",
1254+
TEST_INDEX_BANK)));
1255+
1256+
// 2.5 Should not be pushed down as range query because case result expression is not constant
1257+
assertYamlEqualsIgnoreId(
1258+
loadExpectedPlan("agg_case_composite_cannot_push.yaml"),
1259+
explainQueryYaml(
1260+
String.format(
1261+
"source=%s | eval age_range = case(age < 35, 'u35' else email) | stats avg(balance)"
1262+
+ " as avg_balance by age_range, state",
1263+
TEST_INDEX_BANK)));
1264+
}
1265+
1266+
@Test
1267+
public void testNestedAggregationsExplain() throws IOException {
1268+
// TODO: Remove after resolving: https://github.com/opensearch-project/sql/issues/4578
1269+
Assume.assumeFalse(
1270+
"The query runs into error when pushdown is disabled due to bin's implementation",
1271+
isPushdownDisabled());
1272+
assertYamlEqualsIgnoreId(
1273+
loadExpectedPlan("agg_composite_autodate_range_metric_push.yaml"),
1274+
explainQueryYaml(
1275+
String.format(
1276+
"source=%s | bin timestamp bins=3 | eval value_range = case(value < 7000, 'small'"
1277+
+ " else 'great') | stats bucket_nullable=false avg(value), count() by"
1278+
+ " timestamp, value_range, category",
1279+
TEST_INDEX_TIME_DATA)));
1280+
}
11721281
}

0 commit comments

Comments
 (0)