Skip to content

Commit c0858b5

Browse files
authored
Add unified query API for external integration (#3783)
* Add api module with API and UT Signed-off-by: Chen Dai <[email protected]> * Refactor catalog API and clean up build.gradle Signed-off-by: Chen Dai <[email protected]> * Add cache schema API and refactor UT Signed-off-by: Chen Dai <[email protected]> * Add readme Signed-off-by: Chen Dai <[email protected]> * Add comment for hardcoding query size limit Signed-off-by: Chen Dai <[email protected]> * Add default namespace API with more UTs Signed-off-by: Chen Dai <[email protected]> --------- Signed-off-by: Chen Dai <[email protected]>
1 parent 4f3cbc5 commit c0858b5

File tree

6 files changed

+554
-1
lines changed

6 files changed

+554
-1
lines changed

api/README.md

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# Unified Query API
2+
3+
This module provides a high-level integration layer for the Calcite-based query engine, enabling external systems such as Apache Spark or command-line tools to parse and analyze queries without exposing low-level internals.
4+
5+
## Overview
6+
7+
The `UnifiedQueryPlanner` serves as the primary entry point for external consumers. It accepts PPL (Piped Processing Language) queries and returns Calcite `RelNode` logical plans as intermediate representation.
8+
9+
## Usage
10+
11+
Use the declarative, fluent builder API to initialize the `UnifiedQueryPlanner`.
12+
13+
```java
14+
UnifiedQueryPlanner planner = UnifiedQueryPlanner.builder()
15+
.language(QueryType.PPL)
16+
.catalog("opensearch", schema)
17+
.defaultNamespace("opensearch")
18+
.cacheMetadata(true)
19+
.build();
20+
21+
RelNode plan = planner.plan("source = opensearch.test");
22+
```
23+
24+
## Development & Testing
25+
26+
A set of unit tests is provided to validate planner behavior.
27+
28+
To run tests:
29+
30+
```
31+
./gradlew :api:test
32+
```
33+
34+
## Integration Guide
35+
36+
This guide walks through how to integrate unified query planner into your application.
37+
38+
### Step 1: Add Dependency
39+
40+
The module is currently published as a snapshot to the AWS Sonatype Snapshots repository. To include it as a dependency in your project, add the following to your `pom.xml` or `build.gradle`:
41+
42+
```xml
43+
<dependency>
44+
<groupId>org.opensearch.query</groupId>
45+
<artifactId>unified-query-api</artifactId>
46+
<version>YOUR_VERSION_HERE</version>
47+
</dependency>
48+
```
49+
50+
### Step 2: Implement a Calcite Schema
51+
52+
You must implement the Calcite `Schema` interface and register them using the fluent `catalog()` method on the builder.
53+
54+
```java
55+
public class MySchema extends AbstractSchema {
56+
@Override
57+
protected Map<String, Table> getTableMap() {
58+
return Map.of(
59+
"test_table",
60+
new AbstractTable() {
61+
@Override
62+
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
63+
return typeFactory.createStructType(
64+
List.of(typeFactory.createSqlType(SqlTypeName.INTEGER)),
65+
List.of("id"));
66+
}
67+
});
68+
}
69+
}
70+
```
71+
72+
## Future Work
73+
74+
- Expand support to SQL language.
75+
- Extend planner to generate optimized physical plans using Calcite's optimization frameworks.

api/build.gradle

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
plugins {
7+
id 'java-library'
8+
id 'jacoco'
9+
id 'com.diffplug.spotless' version '6.22.0'
10+
}
11+
12+
dependencies {
13+
api project(':ppl')
14+
15+
testImplementation group: 'junit', name: 'junit', version: '4.13.2'
16+
testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: "${hamcrest_version}"
17+
testImplementation group: 'org.mockito', name: 'mockito-core', version: "${mockito_version}"
18+
testImplementation group: 'org.apache.calcite', name: 'calcite-testkit', version: '1.38.0'
19+
}
20+
21+
spotless {
22+
java {
23+
target fileTree('.') {
24+
include '**/*.java'
25+
exclude '**/build/**', '**/build-*/**', 'src/main/gen/**'
26+
}
27+
importOrder()
28+
removeUnusedImports()
29+
trimTrailingWhitespace()
30+
endWithNewline()
31+
googleJavaFormat('1.17.0').reflowLongStrings().groupArtifact('com.google.googlejavaformat:google-java-format')
32+
}
33+
}
34+
35+
test {
36+
testLogging {
37+
events "passed", "skipped", "failed"
38+
exceptionFormat "full"
39+
}
40+
}
41+
42+
jacocoTestReport {
43+
reports {
44+
html.required = true
45+
xml.required = true
46+
}
47+
afterEvaluate {
48+
classDirectories.setFrom(files(classDirectories.files.collect {
49+
fileTree(dir: it,
50+
exclude: ['**/antlr/parser/**'])
51+
}))
52+
}
53+
}
54+
test.finalizedBy(project.tasks.jacocoTestReport)
55+
jacocoTestCoverageVerification {
56+
violationRules {
57+
rule {
58+
limit {
59+
minimum = 0.9
60+
}
61+
62+
}
63+
}
64+
afterEvaluate {
65+
classDirectories.setFrom(files(classDirectories.files.collect {
66+
fileTree(dir: it,
67+
exclude: ['**/antlr/parser/**'])
68+
}))
69+
}
70+
}
71+
check.dependsOn jacocoTestCoverageVerification
Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.api;
7+
8+
import java.util.HashMap;
9+
import java.util.List;
10+
import java.util.Map;
11+
import java.util.Objects;
12+
import org.antlr.v4.runtime.tree.ParseTree;
13+
import org.apache.calcite.jdbc.CalciteSchema;
14+
import org.apache.calcite.plan.RelTraitDef;
15+
import org.apache.calcite.rel.RelCollation;
16+
import org.apache.calcite.rel.RelCollations;
17+
import org.apache.calcite.rel.RelNode;
18+
import org.apache.calcite.rel.core.Sort;
19+
import org.apache.calcite.rel.logical.LogicalSort;
20+
import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider;
21+
import org.apache.calcite.schema.Schema;
22+
import org.apache.calcite.schema.SchemaPlus;
23+
import org.apache.calcite.sql.parser.SqlParser;
24+
import org.apache.calcite.tools.FrameworkConfig;
25+
import org.apache.calcite.tools.Frameworks;
26+
import org.apache.calcite.tools.Programs;
27+
import org.opensearch.sql.ast.statement.Query;
28+
import org.opensearch.sql.ast.statement.Statement;
29+
import org.opensearch.sql.ast.tree.UnresolvedPlan;
30+
import org.opensearch.sql.calcite.CalcitePlanContext;
31+
import org.opensearch.sql.calcite.CalciteRelNodeVisitor;
32+
import org.opensearch.sql.common.antlr.Parser;
33+
import org.opensearch.sql.common.antlr.SyntaxCheckException;
34+
import org.opensearch.sql.executor.QueryType;
35+
import org.opensearch.sql.ppl.antlr.PPLSyntaxParser;
36+
import org.opensearch.sql.ppl.parser.AstBuilder;
37+
import org.opensearch.sql.ppl.parser.AstStatementBuilder;
38+
39+
/**
40+
* {@code UnifiedQueryPlanner} provides a high-level API for parsing and analyzing queries using the
41+
* Calcite-based query engine. It serves as the primary integration point for external consumers
42+
* such as Spark or command-line tools, abstracting away Calcite internals.
43+
*/
44+
public class UnifiedQueryPlanner {
45+
/** The type of query language being used (e.g., PPL). */
46+
private final QueryType queryType;
47+
48+
/** The parser instance responsible for converting query text into a parse tree. */
49+
private final Parser parser;
50+
51+
/** Calcite framework configuration used during logical plan construction. */
52+
private final FrameworkConfig config;
53+
54+
/** AST-to-RelNode visitor that builds logical plans from the parsed AST. */
55+
private final CalciteRelNodeVisitor relNodeVisitor = new CalciteRelNodeVisitor();
56+
57+
/**
58+
* Constructs a UnifiedQueryPlanner for a given query type and schema root.
59+
*
60+
* @param queryType the query language type (e.g., PPL)
61+
* @param rootSchema the root Calcite schema containing all catalogs and tables
62+
* @param defaultPath dot-separated path of schema to set as default schema
63+
*/
64+
public UnifiedQueryPlanner(QueryType queryType, SchemaPlus rootSchema, String defaultPath) {
65+
this.queryType = queryType;
66+
this.parser = buildQueryParser(queryType);
67+
this.config = buildCalciteConfig(rootSchema, defaultPath);
68+
}
69+
70+
/**
71+
* Parses and analyzes a query string into a Calcite logical plan (RelNode). TODO: Generate
72+
* optimal physical plan to fully unify query execution and leverage Calcite's optimizer.
73+
*
74+
* @param query the raw query string in PPL or other supported syntax
75+
* @return a logical plan representing the query
76+
*/
77+
public RelNode plan(String query) {
78+
try {
79+
return preserveCollation(analyze(parse(query)));
80+
} catch (SyntaxCheckException e) {
81+
// Re-throw syntax error without wrapping
82+
throw e;
83+
} catch (Exception e) {
84+
throw new IllegalStateException("Failed to plan query", e);
85+
}
86+
}
87+
88+
private Parser buildQueryParser(QueryType queryType) {
89+
if (queryType == QueryType.PPL) {
90+
return new PPLSyntaxParser();
91+
}
92+
throw new IllegalArgumentException("Unsupported query type: " + queryType);
93+
}
94+
95+
private FrameworkConfig buildCalciteConfig(SchemaPlus rootSchema, String defaultPath) {
96+
SchemaPlus defaultSchema = findSchemaByPath(rootSchema, defaultPath);
97+
return Frameworks.newConfigBuilder()
98+
.parserConfig(SqlParser.Config.DEFAULT)
99+
.defaultSchema(defaultSchema)
100+
.traitDefs((List<RelTraitDef>) null)
101+
.programs(Programs.calc(DefaultRelMetadataProvider.INSTANCE))
102+
.build();
103+
}
104+
105+
private static SchemaPlus findSchemaByPath(SchemaPlus rootSchema, String defaultPath) {
106+
if (defaultPath == null) {
107+
return rootSchema;
108+
}
109+
110+
// Find schema by the path recursively
111+
SchemaPlus current = rootSchema;
112+
for (String part : defaultPath.split("\\.")) {
113+
current = current.getSubSchema(part);
114+
if (current == null) {
115+
throw new IllegalArgumentException("Invalid default catalog path: " + defaultPath);
116+
}
117+
}
118+
return current;
119+
}
120+
121+
private UnresolvedPlan parse(String query) {
122+
ParseTree cst = parser.parse(query);
123+
AstStatementBuilder astStmtBuilder =
124+
new AstStatementBuilder(
125+
new AstBuilder(query), AstStatementBuilder.StatementBuilderContext.builder().build());
126+
Statement statement = cst.accept(astStmtBuilder);
127+
128+
if (statement instanceof Query) {
129+
return ((Query) statement).getPlan();
130+
}
131+
throw new UnsupportedOperationException(
132+
"Only query statements are supported but got " + statement.getClass().getSimpleName());
133+
}
134+
135+
private RelNode analyze(UnresolvedPlan ast) {
136+
// TODO: Hardcoded query size limit (10000) for now as only logical plan is generated.
137+
CalcitePlanContext calcitePlanContext = CalcitePlanContext.create(config, 10000, queryType);
138+
return relNodeVisitor.analyze(ast, calcitePlanContext);
139+
}
140+
141+
private RelNode preserveCollation(RelNode logical) {
142+
RelNode calcitePlan = logical;
143+
RelCollation collation = logical.getTraitSet().getCollation();
144+
if (!(logical instanceof Sort) && collation != RelCollations.EMPTY) {
145+
calcitePlan = LogicalSort.create(logical, collation, null, null);
146+
}
147+
return calcitePlan;
148+
}
149+
150+
/** Builder for {@link UnifiedQueryPlanner}, supporting declarative fluent API. */
151+
public static Builder builder() {
152+
return new Builder();
153+
}
154+
155+
/**
156+
* Builder for {@link UnifiedQueryPlanner}, supporting both declarative and dynamic schema
157+
* registration for use in query planning.
158+
*/
159+
public static class Builder {
160+
private final Map<String, Schema> catalogs = new HashMap<>();
161+
private String defaultNamespace;
162+
private QueryType queryType;
163+
private boolean cacheMetadata;
164+
165+
/**
166+
* Sets the query language frontend to be used by the planner.
167+
*
168+
* @param queryType the {@link QueryType}, such as PPL
169+
* @return this builder instance
170+
*/
171+
public Builder language(QueryType queryType) {
172+
this.queryType = queryType;
173+
return this;
174+
}
175+
176+
/**
177+
* Registers a catalog with the specified name and its associated schema. The schema can be a
178+
* flat or nested structure (e.g., catalog → schema → table), depending on how data is
179+
* organized.
180+
*
181+
* @param name the name of the catalog to register
182+
* @param schema the schema representing the structure of the catalog
183+
* @return this builder instance
184+
*/
185+
public Builder catalog(String name, Schema schema) {
186+
catalogs.put(name, schema);
187+
return this;
188+
}
189+
190+
/**
191+
* Sets the default namespace path for resolving unqualified table names.
192+
*
193+
* @param namespace dot-separated path (e.g., "spark_catalog.default" or "opensearch")
194+
* @return this builder instance
195+
*/
196+
public Builder defaultNamespace(String namespace) {
197+
this.defaultNamespace = namespace;
198+
return this;
199+
}
200+
201+
/**
202+
* Enables or disables catalog metadata caching in the root schema.
203+
*
204+
* @param cache whether to enable metadata caching
205+
* @return this builder instance
206+
*/
207+
public Builder cacheMetadata(boolean cache) {
208+
this.cacheMetadata = cache;
209+
return this;
210+
}
211+
212+
/**
213+
* Builds a {@link UnifiedQueryPlanner} with the configuration.
214+
*
215+
* @return a new instance of {@link UnifiedQueryPlanner}
216+
*/
217+
public UnifiedQueryPlanner build() {
218+
Objects.requireNonNull(queryType, "Must specify language before build");
219+
SchemaPlus rootSchema = CalciteSchema.createRootSchema(true, cacheMetadata).plus();
220+
catalogs.forEach(rootSchema::add);
221+
return new UnifiedQueryPlanner(queryType, rootSchema, defaultNamespace);
222+
}
223+
}
224+
}

0 commit comments

Comments
 (0)