diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index 7cec960b82a..cbdd0a05234 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -1809,18 +1809,16 @@ public RelNode visitMultisearch(Multisearch node, CalcitePlanContext context) { } /** - * Finds the timestamp field for multisearch ordering. + * Finds the @timestamp field for multisearch ordering. Only @timestamp field is used for + * timestamp interleaving. Other timestamp-like fields are ignored. * - * @param rowType The row type to search for timestamp fields - * @return The name of the timestamp field, or null if not found + * @param rowType The row type to search for @timestamp field + * @return "@timestamp" if the field exists, or null if not found */ private String findTimestampField(RelDataType rowType) { - String[] candidates = {"@timestamp", "_time", "timestamp", "time"}; - for (String fieldName : candidates) { - RelDataTypeField field = rowType.getField(fieldName, false, false); - if (field != null) { - return fieldName; - } + RelDataTypeField field = rowType.getField("@timestamp", false, false); + if (field != null) { + return "@timestamp"; } return null; } diff --git a/core/src/main/java/org/opensearch/sql/calcite/SchemaUnifier.java b/core/src/main/java/org/opensearch/sql/calcite/SchemaUnifier.java index 627d1de8dc4..05380ce8c48 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/SchemaUnifier.java +++ b/core/src/main/java/org/opensearch/sql/calcite/SchemaUnifier.java @@ -7,29 +7,27 @@ import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexNode; -import org.apache.calcite.sql.validate.SqlValidatorUtil; /** - * Utility class for unifying schemas across multiple RelNodes with type conflict resolution. Uses - * the same strategy as append command - renames conflicting fields to avoid type conflicts. + * Utility class for unifying schemas across multiple RelNodes. Throws an exception when type + * conflicts are detected. */ public class SchemaUnifier { /** - * Builds a unified schema for multiple nodes with type conflict resolution. + * Builds a unified schema for multiple nodes. Throws an exception if type conflicts are detected. * * @param nodes List of RelNodes to unify schemas for * @param context Calcite plan context * @return List of projected RelNodes with unified schema + * @throws IllegalArgumentException if type conflicts are detected */ public static List buildUnifiedSchemaWithConflictResolution( List nodes, CalcitePlanContext context) { @@ -41,7 +39,7 @@ public static List buildUnifiedSchemaWithConflictResolution( return nodes; } - // Step 1: Build the unified schema by processing all nodes + // Step 1: Build the unified schema by processing all nodes (throws on conflict) List unifiedSchema = buildUnifiedSchema(nodes); // Step 2: Create projections for each node to align with unified schema @@ -55,47 +53,37 @@ public static List buildUnifiedSchemaWithConflictResolution( projectedNodes.add(projectedNode); } - // Step 3: Unify names to handle type conflicts (this creates age0, age1, etc.) - List uniqueNames = - SqlValidatorUtil.uniquify(fieldNames, SqlValidatorUtil.EXPR_SUGGESTER, true); - - // Step 4: Re-project with unique names if needed - if (!uniqueNames.equals(fieldNames)) { - List renamedNodes = new ArrayList<>(); - for (RelNode node : projectedNodes) { - RelNode renamedNode = - context.relBuilder.push(node).project(context.relBuilder.fields(), uniqueNames).build(); - renamedNodes.add(renamedNode); - } - return renamedNodes; - } - return projectedNodes; } /** - * Builds a unified schema by merging fields from all nodes. Fields with the same name but - * different types are added as separate entries (which will be renamed during uniquification). + * Builds a unified schema by merging fields from all nodes. Throws an exception if fields with + * the same name have different types. * * @param nodes List of RelNodes to merge schemas from - * @return List of SchemaField representing the unified schema (may contain duplicate names) + * @return List of SchemaField representing the unified schema + * @throws IllegalArgumentException if type conflicts are detected */ private static List buildUnifiedSchema(List nodes) { List schema = new ArrayList<>(); - Map> seenFields = new HashMap<>(); + Map seenFields = new HashMap<>(); for (RelNode node : nodes) { for (RelDataTypeField field : node.getRowType().getFieldList()) { String fieldName = field.getName(); RelDataType fieldType = field.getType(); - // Track which (name, type) combinations we've seen - Set typesForName = seenFields.computeIfAbsent(fieldName, k -> new HashSet<>()); - - if (!typesForName.contains(fieldType)) { - // New field or same name with different type - add to schema + RelDataType existingType = seenFields.get(fieldName); + if (existingType == null) { + // New field - add to schema schema.add(new SchemaField(fieldName, fieldType)); - typesForName.add(fieldType); + seenFields.put(fieldName, fieldType); + } else if (!areTypesCompatible(existingType, fieldType)) { + // Same field name but different type - throw exception + throw new IllegalArgumentException( + String.format( + "Unable to process column '%s' due to incompatible types: '%s' and '%s'", + fieldName, existingType.getSqlTypeName(), fieldType.getSqlTypeName())); } // If we've seen this exact (name, type) combination, skip it } @@ -104,6 +92,10 @@ private static List buildUnifiedSchema(List nodes) { return schema; } + private static boolean areTypesCompatible(RelDataType type1, RelDataType type2) { + return type1.getSqlTypeName() != null && type1.getSqlTypeName().equals(type2.getSqlTypeName()); + } + /** * Builds a projection for a node to align with the unified schema. For each field in the unified * schema: - If the node has a matching field with the same type, use it - Otherwise, project NULL @@ -125,8 +117,8 @@ private static List buildProjectionForNode( RelDataType expectedType = schemaField.getType(); RelDataTypeField nodeField = nodeFieldMap.get(fieldName); - if (nodeField != null && nodeField.getType().equals(expectedType)) { - // Field exists with matching type - use it + if (nodeField != null && areTypesCompatible(nodeField.getType(), expectedType)) { + // Field exists with compatible type - use it projection.add(context.rexBuilder.makeInputRef(node, nodeField.getIndex())); } else { // Field missing or type mismatch - project NULL diff --git a/docs/category.json b/docs/category.json index 49529b08bdc..d9605598800 100644 --- a/docs/category.json +++ b/docs/category.json @@ -40,6 +40,7 @@ "user/ppl/cmd/rare.rst", "user/ppl/cmd/regex.rst", "user/ppl/cmd/rename.rst", + "user/ppl/cmd/multisearch.rst", "user/ppl/cmd/replace.rst", "user/ppl/cmd/rex.rst", "user/ppl/cmd/search.rst", diff --git a/docs/user/ppl/cmd/append.rst b/docs/user/ppl/cmd/append.rst index 982e0e33024..25303aeb87b 100644 --- a/docs/user/ppl/cmd/append.rst +++ b/docs/user/ppl/cmd/append.rst @@ -24,6 +24,11 @@ append * sub-search: mandatory. Executes PPL commands as a secondary search. +Limitations +=========== + +* **Schema Compatibility**: When fields with the same name exist between the main search and sub-search but have incompatible types, the query will fail with an error. To avoid type conflicts, ensure that fields with the same name have the same data type, or use different field names (e.g., by renaming with ``eval`` or using ``fields`` to select non-conflicting columns). + Example 1: Append rows from a count aggregation to existing search result =============================================================== @@ -64,23 +69,3 @@ PPL query:: | 101 | M | null | +-----+--------+-------+ -Example 3: Append rows with column type conflict -============================================= - -This example shows how column type conflicts are handled when appending results. Same name columns with different types will generate two different columns in appended result. - -PPL query:: - - os> source=accounts | stats sum(age) as sum by gender, state | sort -sum | head 5 | append [ source=accounts | stats sum(age) as sum by gender | eval sum = cast(sum as double) ]; - fetched rows / total rows = 6/6 - +------+--------+-------+-------+ - | sum | gender | state | sum0 | - |------+--------+-------+-------| - | 36 | M | TN | null | - | 33 | M | MD | null | - | 32 | M | IL | null | - | 28 | F | VA | null | - | null | F | null | 28.0 | - | null | M | null | 101.0 | - +------+--------+-------+-------+ - diff --git a/docs/user/ppl/cmd/multisearch.rst b/docs/user/ppl/cmd/multisearch.rst index 10820badc54..2bac577ef23 100644 --- a/docs/user/ppl/cmd/multisearch.rst +++ b/docs/user/ppl/cmd/multisearch.rst @@ -30,10 +30,6 @@ Description * **A/B Testing Analysis**: Combine results from different test groups for comparison * **Time-series Data Merging**: Interleave events from multiple sources based on timestamps -Version -======= -3.3.0 - Syntax ====== | multisearch ... @@ -59,7 +55,7 @@ Limitations =========== * **Minimum Subsearches**: At least two subsearches must be specified -* **Schema Compatibility**: When fields with the same name exist across subsearches but have incompatible types, the system automatically resolves conflicts by renaming the conflicting fields. The first occurrence retains the original name, while subsequent conflicting fields are renamed with a numeric suffix (e.g., ``age`` becomes ``age0``, ``age1``, etc.). This ensures all data is preserved while maintaining schema consistency. +* **Schema Compatibility**: When fields with the same name exist across subsearches but have incompatible types, the query will fail with an error. To avoid type conflicts, ensure that fields with the same name have the same data type across all subsearches, or use different field names (e.g., by renaming with ``eval`` or using ``fields`` to select non-conflicting columns). Usage ===== @@ -84,8 +80,8 @@ PPL query:: |-----------+-----+-----------| | Nanette | 28 | young | | Amber | 32 | adult | + | Dale | 33 | adult | | Hattie | 36 | adult | - | Dale | 37 | adult | +-----------+-----+-----------+ Example 2: Success Rate Pattern @@ -97,14 +93,14 @@ PPL query:: os> | multisearch [search source=accounts | where balance > 20000 | eval query_type = "high_balance" | fields firstname, balance, query_type] [search source=accounts | where balance > 0 AND balance <= 20000 | eval query_type = "regular" | fields firstname, balance, query_type] | sort balance desc; fetched rows / total rows = 4/4 - +-----------+---------+-------------+ - | firstname | balance | query_type | - |-----------+---------+-------------| - | Amber | 39225 | high_balance| - | Nanette | 32838 | high_balance| - | Hattie | 5686 | regular | - | Dale | 4180 | regular | - +-----------+---------+-------------+ + +-----------+---------+--------------+ + | firstname | balance | query_type | + |-----------+---------+--------------| + | Amber | 39225 | high_balance | + | Nanette | 32838 | high_balance | + | Hattie | 5686 | regular | + | Dale | 4180 | regular | + +-----------+---------+--------------+ Example 3: Timestamp Interleaving ================================== @@ -113,37 +109,19 @@ Combine time-series data from multiple sources with automatic timestamp-based or PPL query:: - os> | multisearch [search source=time_data | where category IN ("A", "B")] [search source=time_data2 | where category IN ("E", "F")] | head 5; + os> | multisearch [search source=time_data | where category IN ("A", "B")] [search source=time_data2 | where category IN ("E", "F")] | fields @timestamp, category, value, timestamp | head 5; fetched rows / total rows = 5/5 - +-------+---------------------+----------+-------+---------------------+ - | index | @timestamp | category | value | timestamp | - |-------+---------------------+----------+-------+---------------------| - | null | 2025-08-01 04:00:00 | E | 2001 | 2025-08-01 04:00:00 | - | null | 2025-08-01 03:47:41 | A | 8762 | 2025-08-01 03:47:41 | - | null | 2025-08-01 02:30:00 | F | 2002 | 2025-08-01 02:30:00 | - | null | 2025-08-01 01:14:11 | B | 9015 | 2025-08-01 01:14:11 | - | null | 2025-08-01 01:00:00 | E | 2003 | 2025-08-01 01:00:00 | - +-------+---------------------+----------+-------+---------------------+ - -Example 4: Handling Empty Results -================================== - -Multisearch gracefully handles cases where some subsearches return no results. - -PPL query:: - - os> | multisearch [search source=accounts | where age > 25 | fields firstname, age] [search source=accounts | where age > 200 | eval impossible = "yes" | fields firstname, age, impossible] | head 5; - fetched rows / total rows = 4/4 - +-----------+-----+------------+ - | firstname | age | impossible | - |-----------+-----+------------| - | Nanette | 28 | null | - | Amber | 32 | null | - | Hattie | 36 | null | - | Dale | 37 | null | - +-----------+-----+------------+ - -Example 5: Type Compatibility - Missing Fields + +---------------------+----------+-------+---------------------+ + | @timestamp | category | value | timestamp | + |---------------------+----------+-------+---------------------| + | 2025-08-01 04:00:00 | E | 2001 | 2025-08-01 04:00:00 | + | 2025-08-01 03:47:41 | A | 8762 | 2025-08-01 03:47:41 | + | 2025-08-01 02:30:00 | F | 2002 | 2025-08-01 02:30:00 | + | 2025-08-01 01:14:11 | B | 9015 | 2025-08-01 01:14:11 | + | 2025-08-01 01:00:00 | E | 2003 | 2025-08-01 01:00:00 | + +---------------------+----------+-------+---------------------+ + +Example 4: Type Compatibility - Missing Fields ================================================= Demonstrate how missing fields are handled with NULL insertion. @@ -157,26 +135,7 @@ PPL query:: |-----------+-----+------------| | Nanette | 28 | yes | | Amber | 32 | null | + | Dale | 33 | null | | Hattie | 36 | null | - | Dale | 37 | null | +-----------+-----+------------+ -Example 6: Type Conflict Resolution - Automatic Renaming -=========================================================== - -When the same field name has incompatible types across subsearches, the system automatically renames conflicting fields with numeric suffixes. - -PPL query:: - - os> | multisearch [search source=accounts | fields firstname, age, balance | head 2] [search source=locations | fields description, age, place_id | head 2]; - fetched rows / total rows = 4/4 - +-----------+-----+---------+------------------+------+----------+ - | firstname | age | balance | description | age0 | place_id | - |-----------+-----+---------+------------------+------+----------| - | Amber | 32 | 39225 | null | null | null | - | Hattie | 36 | 5686 | null | null | null | - | null | null| null | Central Park | old | 1001 | - | null | null| null | Times Square | modern| 1002 | - +-----------+-----+---------+------------------+------+----------+ - -In this example, the ``age`` field has type ``bigint`` in accounts but type ``string`` in locations. The system keeps the first occurrence as ``age`` (bigint) and renames the second occurrence to ``age0`` (string), preserving all data while avoiding type conflicts. diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteMultisearchCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteMultisearchCommandIT.java index 72772c1cd84..393b0a4a501 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteMultisearchCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteMultisearchCommandIT.java @@ -210,8 +210,8 @@ public void testMultisearchWithDifferentIndicesSchemaMerge() throws IOException executeQuery( String.format( "| multisearch [search source=%s | where age > 35 | fields account_number," - + " firstname, age, balance] [search source=%s | where age > 35 | fields" - + " account_number, balance, age] | stats count() as total_count", + + " firstname, balance] [search source=%s | where age > 35 | fields" + + " account_number, balance] | stats count() as total_count", TEST_INDEX_ACCOUNT, TEST_INDEX_BANK)); verifySchema(result, schema("total_count", null, "bigint")); @@ -300,30 +300,23 @@ public void testMultisearchNullFillingAcrossIndices() throws IOException { } @Test - public void testMultisearchWithDirectTypeConflict() throws IOException { - JSONObject result = - executeQuery( - String.format( - "| multisearch " - + "[search source=%s | fields firstname, age, balance | head 2] " - + "[search source=%s | fields description, age, place_id | head 2]", - TEST_INDEX_ACCOUNT, TEST_INDEX_LOCATIONS_TYPE_CONFLICT)); - - verifySchema( - result, - schema("firstname", null, "string"), - schema("age", null, "bigint"), - schema("balance", null, "bigint"), - schema("description", null, "string"), - schema("age0", null, "string"), - schema("place_id", null, "int")); + public void testMultisearchWithDirectTypeConflict() { + Exception exception = + assertThrows( + ResponseException.class, + () -> + executeQuery( + String.format( + "| multisearch " + + "[search source=%s | fields firstname, age, balance | head 2] " + + "[search source=%s | fields description, age, place_id | head 2]", + TEST_INDEX_ACCOUNT, TEST_INDEX_LOCATIONS_TYPE_CONFLICT))); - verifyDataRows( - result, - rows("Amber", 32L, 39225L, null, null, null), - rows("Hattie", 36L, 5686L, null, null, null), - rows(null, null, null, "Central Park", "old", 1001), - rows(null, null, null, "Times Square", "modern", 1002)); + assertTrue( + "Error message should indicate type conflict", + exception + .getMessage() + .contains("Unable to process column 'age' due to incompatible types:")); } @Test @@ -352,18 +345,23 @@ public void testMultisearchCrossIndexFieldSelection() throws IOException { } @Test - public void testMultisearchTypeConflictWithStats() throws IOException { - JSONObject result = - executeQuery( - String.format( - "| multisearch " - + "[search source=%s | fields age] " - + "[search source=%s | fields age] " - + "| stats count() as total", - TEST_INDEX_ACCOUNT, TEST_INDEX_LOCATIONS_TYPE_CONFLICT)); - - verifySchema(result, schema("total", null, "bigint")); + public void testMultisearchTypeConflictWithStats() { + Exception exception = + assertThrows( + ResponseException.class, + () -> + executeQuery( + String.format( + "| multisearch " + + "[search source=%s | fields age] " + + "[search source=%s | fields age] " + + "| stats count() as total", + TEST_INDEX_ACCOUNT, TEST_INDEX_LOCATIONS_TYPE_CONFLICT))); - verifyDataRows(result, rows(1010L)); + assertTrue( + "Error message should indicate type conflict", + exception + .getMessage() + .contains("Unable to process column 'age' due to incompatible types:")); } } diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendCommandIT.java index bc1e11a908c..d01ddfb2a44 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendCommandIT.java @@ -19,6 +19,7 @@ import java.util.Locale; import org.json.JSONObject; import org.junit.Test; +import org.opensearch.client.ResponseException; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.ppl.PPLIntegTestCase; @@ -215,28 +216,25 @@ public void testAppendWithMergedColumn() throws IOException { } @Test - public void testAppendWithConflictTypeColumn() throws IOException { - JSONObject actual = - executeQuery( - String.format( - Locale.ROOT, - "source=%s | stats sum(age) as sum by gender | append [ source=%s | stats sum(age)" - + " as sum by state | sort sum | eval sum = cast(sum as double) ] | head 5", - TEST_INDEX_ACCOUNT, - TEST_INDEX_ACCOUNT)); - verifySchemaInOrder( - actual, - schema("sum", "bigint"), - schema("gender", "string"), - schema("state", "string"), - schema("sum0", "double")); - verifyDataRows( - actual, - rows(14947, "F", null, null), - rows(15224, "M", null, null), - rows(null, null, "NV", 369d), - rows(null, null, "NM", 412d), - rows(null, null, "AZ", 414d)); + public void testAppendWithConflictTypeColumn() { + Exception exception = + assertThrows( + ResponseException.class, + () -> + executeQuery( + String.format( + Locale.ROOT, + "source=%s | stats sum(age) as sum by gender | append [ source=%s | stats" + + " sum(age) as sum by state | sort sum | eval sum = cast(sum as" + + " double) ] | head 5", + TEST_INDEX_ACCOUNT, + TEST_INDEX_ACCOUNT))); + + assertTrue( + "Error message should indicate type conflict", + exception + .getMessage() + .contains("Unable to process column 'sum' due to incompatible types:")); } @Test @@ -245,7 +243,7 @@ public void testAppendSchemaMergeWithTimestampUDT() throws IOException { executeQuery( String.format( Locale.ROOT, - "source=%s | fields account_number, age | append [ source=%s | fields" + "source=%s | fields account_number, firstname | append [ source=%s | fields" + " account_number, age, birthdate ] | where isnotnull(birthdate) and" + " account_number > 30", TEST_INDEX_ACCOUNT, @@ -253,8 +251,8 @@ public void testAppendSchemaMergeWithTimestampUDT() throws IOException { verifySchemaInOrder( actual, schema("account_number", "bigint"), - schema("age", "bigint"), - schema("age0", "int"), + schema("firstname", "string"), + schema("age", "int"), schema("birthdate", "string")); verifyDataRows(actual, rows(32, null, 34, "2018-08-11 00:00:00")); } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendTest.java index 614ed2ec32d..a163af186d5 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendTest.java @@ -9,6 +9,7 @@ import java.util.List; import org.apache.calcite.rel.RelNode; import org.apache.calcite.test.CalciteAssert; +import org.junit.Assert; import org.junit.Test; public class CalcitePPLAppendTest extends CalcitePPLAbstractTest { @@ -71,15 +72,16 @@ public void testAppendEmptySearchCommand() { @Test public void testAppendNested() { String ppl = - "source=EMP | append [ | where DEPTNO = 10 | append [ source=EMP | where DEPTNO = 20 ] ]"; + "source=EMP | fields ENAME, SAL | append [ | append [ source=EMP | where DEPTNO = 20 ] ]"; RelNode root = getRelNode(ppl); String expectedLogical = "LogicalUnion(all=[true])\n" - + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," - + " SAL=[$5], COMM=[$6], DEPTNO=[$7], EMPNO0=[null:SMALLINT])\n" + + " LogicalProject(ENAME=[$1], SAL=[$5], EMPNO=[null:SMALLINT], JOB=[null:VARCHAR(9)]," + + " MGR=[null:SMALLINT], HIREDATE=[null:DATE], COMM=[null:DECIMAL(7, 2)]," + + " DEPTNO=[null:TINYINT])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalProject(EMPNO=[null:SMALLINT], ENAME=[$1], JOB=[$2], MGR=[$3]," - + " HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], EMPNO0=[$0])\n" + + " LogicalProject(ENAME=[$1], SAL=[$5], EMPNO=[$0], JOB=[$2], MGR=[$3]," + + " HIREDATE=[$4], COMM=[$6], DEPTNO=[$7])\n" + " LogicalUnion(all=[true])\n" + " LogicalValues(tuples=[[]])\n" + " LogicalFilter(condition=[=($7, 20)])\n" @@ -88,12 +90,12 @@ public void testAppendNested() { verifyResultCount(root, 19); // 14 original table rows + 5 filtered subquery rows String expectedSparkSql = - "SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`, CAST(NULL AS" - + " SMALLINT) `EMPNO0`\n" + "SELECT `ENAME`, `SAL`, CAST(NULL AS SMALLINT) `EMPNO`, CAST(NULL AS STRING) `JOB`," + + " CAST(NULL AS SMALLINT) `MGR`, CAST(NULL AS DATE) `HIREDATE`, CAST(NULL AS" + + " DECIMAL(7, 2)) `COMM`, CAST(NULL AS TINYINT) `DEPTNO`\n" + "FROM `scott`.`EMP`\n" + "UNION ALL\n" - + "SELECT CAST(NULL AS SMALLINT) `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`," - + " `COMM`, `DEPTNO`, `EMPNO` `EMPNO0`\n" + + "SELECT `ENAME`, `SAL`, `EMPNO`, `JOB`, `MGR`, `HIREDATE`, `COMM`, `DEPTNO`\n" + "FROM (SELECT *\n" + "FROM (VALUES (NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)) `t` (`EMPNO`," + " `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`)\n" @@ -109,61 +111,63 @@ public void testAppendNested() { public void testAppendEmptySourceWithJoin() { List emptySourceWithEmptySourceJoinPPLs = Arrays.asList( - "source=EMP | append [ | where DEPTNO = 10 | join on ENAME = DNAME DEPT ]", - "source=EMP | append [ | where DEPTNO = 10 | cross join on ENAME = DNAME DEPT ]", - "source=EMP | append [ | where DEPTNO = 10 | left join on ENAME = DNAME DEPT ]", - "source=EMP | append [ | where DEPTNO = 10 | semi join on ENAME = DNAME DEPT ]", - "source=EMP | append [ | where DEPTNO = 10 | anti join on ENAME = DNAME DEPT ]"); + "source=EMP | fields EMPNO, ENAME, JOB | append [ | where DEPTNO = 10 | join on ENAME" + + " = DNAME DEPT ]", + "source=EMP | fields EMPNO, ENAME, JOB | append [ | where DEPTNO = 10 | cross join on" + + " ENAME = DNAME DEPT ]", + "source=EMP | fields EMPNO, ENAME, JOB | append [ | where DEPTNO = 10 | left join on" + + " ENAME = DNAME DEPT ]", + "source=EMP | fields EMPNO, ENAME, JOB | append [ | where DEPTNO = 10 | semi join on" + + " ENAME = DNAME DEPT ]", + "source=EMP | fields EMPNO, ENAME, JOB | append [ | where DEPTNO = 10 | anti join on" + + " ENAME = DNAME DEPT ]"); for (String ppl : emptySourceWithEmptySourceJoinPPLs) { RelNode root = getRelNode(ppl); String expectedLogical = "LogicalUnion(all=[true])\n" - + " LogicalTableScan(table=[[scott, EMP]])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n" + " LogicalValues(tuples=[[]])\n"; verifyLogical(root, expectedLogical); verifyResultCount(root, 14); String expectedSparkSql = - "SELECT *\n" + "SELECT `EMPNO`, `ENAME`, `JOB`\n" + "FROM `scott`.`EMP`\n" + "UNION ALL\n" + "SELECT *\n" - + "FROM (VALUES (NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)) `t` (`EMPNO`," - + " `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`)\n" + + "FROM (VALUES (NULL, NULL, NULL)) `t` (`EMPNO`, `ENAME`, `JOB`)\n" + "WHERE 1 = 0"; verifyPPLToSparkSQL(root, expectedSparkSql); } List emptySourceWithRightOrFullJoinPPLs = Arrays.asList( - "source=EMP | append [ | where DEPTNO = 10 | right join on ENAME = DNAME DEPT ]", - "source=EMP | append [ | where DEPTNO = 10 | full join on ENAME = DNAME DEPT ]"); + "source=EMP | fields EMPNO, ENAME, JOB | append [ | where DEPTNO = 10 | right join on" + + " ENAME = DNAME DEPT ]", + "source=EMP | fields EMPNO, ENAME, JOB | append [ | where DEPTNO = 10 | full join on" + + " ENAME = DNAME DEPT ]"); for (String ppl : emptySourceWithRightOrFullJoinPPLs) { RelNode root = getRelNode(ppl); String expectedLogical = "LogicalUnion(all=[true])\n" - + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," - + " SAL=[$5], COMM=[$6], DEPTNO=[$7], DEPTNO0=[null:TINYINT]," + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], DEPTNO=[null:TINYINT]," + " DNAME=[null:VARCHAR(14)], LOC=[null:VARCHAR(13)])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" + " LogicalProject(EMPNO=[null:SMALLINT], ENAME=[null:VARCHAR(10)]," - + " JOB=[null:VARCHAR(9)], MGR=[null:SMALLINT], HIREDATE=[null:DATE]," - + " SAL=[null:DECIMAL(7, 2)], COMM=[null:DECIMAL(7, 2)], DEPTNO=[null:TINYINT]," - + " DEPTNO0=[$0], DNAME=[$1], LOC=[$2])\n" + + " JOB=[null:VARCHAR(9)], DEPTNO=[$0], DNAME=[$1], LOC=[$2])\n" + " LogicalTableScan(table=[[scott, DEPT]])\n"; verifyLogical(root, expectedLogical); String expectedSparkSql = - "SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`, CAST(NULL AS" - + " TINYINT) `DEPTNO0`, CAST(NULL AS STRING) `DNAME`, CAST(NULL AS STRING) `LOC`\n" + "SELECT `EMPNO`, `ENAME`, `JOB`, CAST(NULL AS TINYINT) `DEPTNO`, CAST(NULL AS STRING)" + + " `DNAME`, CAST(NULL AS STRING) `LOC`\n" + "FROM `scott`.`EMP`\n" + "UNION ALL\n" + "SELECT CAST(NULL AS SMALLINT) `EMPNO`, CAST(NULL AS STRING) `ENAME`, CAST(NULL AS" - + " STRING) `JOB`, CAST(NULL AS SMALLINT) `MGR`, CAST(NULL AS DATE) `HIREDATE`," - + " CAST(NULL AS DECIMAL(7, 2)) `SAL`, CAST(NULL AS DECIMAL(7, 2)) `COMM`, CAST(NULL" - + " AS TINYINT) `DEPTNO`, `DEPTNO` `DEPTNO0`, `DNAME`, `LOC`\n" + + " STRING) `JOB`, `DEPTNO`, `DNAME`, `LOC`\n" + "FROM `scott`.`DEPT`"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -172,15 +176,15 @@ public void testAppendEmptySourceWithJoin() { @Test public void testAppendDifferentIndex() { String ppl = - "source=EMP | fields EMPNO, DEPTNO | append [ source=DEPT | fields DEPTNO, DNAME | where" + "source=EMP | fields EMPNO, ENAME | append [ source=DEPT | fields DEPTNO, DNAME | where" + " DEPTNO = 20 ]"; RelNode root = getRelNode(ppl); String expectedLogical = "LogicalUnion(all=[true])\n" - + " LogicalProject(EMPNO=[$0], DEPTNO=[$7], DEPTNO0=[null:TINYINT]," + + " LogicalProject(EMPNO=[$0], ENAME=[$1], DEPTNO=[null:TINYINT]," + " DNAME=[null:VARCHAR(14)])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalProject(EMPNO=[null:SMALLINT], DEPTNO=[null:TINYINT], DEPTNO0=[$0]," + + " LogicalProject(EMPNO=[null:SMALLINT], ENAME=[null:VARCHAR(10)], DEPTNO=[$0]," + " DNAME=[$1])\n" + " LogicalFilter(condition=[=($0, 20)])\n" + " LogicalProject(DEPTNO=[$0], DNAME=[$1])\n" @@ -188,11 +192,11 @@ public void testAppendDifferentIndex() { verifyLogical(root, expectedLogical); String expectedSparkSql = - "SELECT `EMPNO`, `DEPTNO`, CAST(NULL AS TINYINT) `DEPTNO0`, CAST(NULL AS STRING) `DNAME`\n" + "SELECT `EMPNO`, `ENAME`, CAST(NULL AS TINYINT) `DEPTNO`, CAST(NULL AS STRING) `DNAME`\n" + "FROM `scott`.`EMP`\n" + "UNION ALL\n" - + "SELECT CAST(NULL AS SMALLINT) `EMPNO`, CAST(NULL AS TINYINT) `DEPTNO`, `DEPTNO`" - + " `DEPTNO0`, `DNAME`\n" + + "SELECT CAST(NULL AS SMALLINT) `EMPNO`, CAST(NULL AS STRING) `ENAME`, `DEPTNO`," + + " `DNAME`\n" + "FROM (SELECT `DEPTNO`, `DNAME`\n" + "FROM `scott`.`DEPT`) `t0`\n" + "WHERE `DEPTNO` = 20"; @@ -227,22 +231,9 @@ public void testAppendWithMergedColumns() { public void testAppendWithConflictTypeColumn() { String ppl = "source=EMP | fields DEPTNO | append [ source=EMP | fields DEPTNO | eval DEPTNO = 20 ]"; - RelNode root = getRelNode(ppl); - String expectedLogical = - "LogicalUnion(all=[true])\n" - + " LogicalProject(DEPTNO=[$7], DEPTNO0=[null:INTEGER])\n" - + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalProject(DEPTNO=[null:TINYINT], DEPTNO0=[20])\n" - + " LogicalTableScan(table=[[scott, EMP]])\n"; - verifyLogical(root, expectedLogical); - verifyResultCount(root, 28); - - String expectedSparkSql = - "SELECT `DEPTNO`, CAST(NULL AS INTEGER) `DEPTNO0`\n" - + "FROM `scott`.`EMP`\n" - + "UNION ALL\n" - + "SELECT CAST(NULL AS TINYINT) `DEPTNO`, 20 `DEPTNO0`\n" - + "FROM `scott`.`EMP`"; - verifyPPLToSparkSQL(root, expectedSparkSql); + Exception exception = + Assert.assertThrows(IllegalArgumentException.class, () -> getRelNode(ppl)); + verifyErrorMessageContains( + exception, "Unable to process column 'DEPTNO' due to incompatible types:"); } } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLMultisearchTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLMultisearchTest.java index e69030753f2..8746fe846e5 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLMultisearchTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLMultisearchTest.java @@ -142,30 +142,28 @@ public void testMultisearchCrossIndices() { // Test multisearch with different tables (indices) String ppl = "| multisearch [search source=EMP | where DEPTNO = 10 | fields EMPNO, ENAME," - + " DEPTNO] [search source=DEPT | where DEPTNO = 10 | fields DEPTNO, DNAME | eval EMPNO" - + " = DEPTNO, ENAME = DNAME]"; + + " JOB] [search source=DEPT | where DEPTNO = 10 | fields DEPTNO, DNAME, LOC]"; RelNode root = getRelNode(ppl); String expectedLogical = "LogicalUnion(all=[true])\n" - + " LogicalProject(EMPNO=[$0], ENAME=[$1], DEPTNO=[$7], DEPTNO0=[null:TINYINT]," - + " DNAME=[null:VARCHAR(14)], EMPNO0=[null:TINYINT], ENAME0=[null:VARCHAR(14)])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], DEPTNO=[null:TINYINT]," + + " DNAME=[null:VARCHAR(14)], LOC=[null:VARCHAR(13)])\n" + " LogicalFilter(condition=[=($7, 10)])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" + " LogicalProject(EMPNO=[null:SMALLINT], ENAME=[null:VARCHAR(10)]," - + " DEPTNO=[null:TINYINT], DEPTNO0=[$0], DNAME=[$1], EMPNO0=[$0], ENAME0=[$1])\n" + + " JOB=[null:VARCHAR(9)], DEPTNO=[$0], DNAME=[$1], LOC=[$2])\n" + " LogicalFilter(condition=[=($0, 10)])\n" + " LogicalTableScan(table=[[scott, DEPT]])\n"; verifyLogical(root, expectedLogical); String expectedSparkSql = - "SELECT `EMPNO`, `ENAME`, `DEPTNO`, CAST(NULL AS TINYINT) `DEPTNO0`, CAST(NULL AS STRING)" - + " `DNAME`, CAST(NULL AS TINYINT) `EMPNO0`, CAST(NULL AS STRING) `ENAME0`\n" + "SELECT `EMPNO`, `ENAME`, `JOB`, CAST(NULL AS TINYINT) `DEPTNO`, CAST(NULL AS STRING)" + + " `DNAME`, CAST(NULL AS STRING) `LOC`\n" + "FROM `scott`.`EMP`\n" + "WHERE `DEPTNO` = 10\n" + "UNION ALL\n" + "SELECT CAST(NULL AS SMALLINT) `EMPNO`, CAST(NULL AS STRING) `ENAME`, CAST(NULL AS" - + " TINYINT) `DEPTNO`, `DEPTNO` `DEPTNO0`, `DNAME`, `DEPTNO` `EMPNO0`, `DNAME`" - + " `ENAME0`\n" + + " STRING) `JOB`, `DEPTNO`, `DNAME`, `LOC`\n" + "FROM `scott`.`DEPT`\n" + "WHERE `DEPTNO` = 10"; verifyPPLToSparkSQL(root, expectedSparkSql);