Skip to content

Commit a3f16c3

Browse files
committed
[GRPC] Add terms query and document proto refactoring
1 parent 18a3b75 commit a3f16c3

26 files changed

+833
-242
lines changed

plugins/transport-grpc/build.gradle

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@ testClusters {
2121
}
2222
}
2323

24+
repositories {
25+
maven {
26+
url = 'https://aws.oss.sonatype.org/content/repositories/snapshots'
27+
}
28+
}
29+
2430
dependencies {
2531
compileOnly "com.google.code.findbugs:jsr305:3.0.2"
2632
runtimeOnly "com.google.guava:guava:${versions.guava}"
@@ -35,7 +41,8 @@ dependencies {
3541
implementation "io.grpc:grpc-stub:${versions.grpc}"
3642
implementation "io.grpc:grpc-util:${versions.grpc}"
3743
implementation "io.perfmark:perfmark-api:0.26.0"
38-
implementation "org.opensearch:protobufs:0.2.0"
44+
implementation "org.opensearch:protobufs:0.3.0-SNAPSHOT"
45+
// implementation "org.opensearch:protobufs:0.3.0"
3946
testImplementation project(':test:framework')
4047
}
4148

plugins/transport-grpc/licenses/protobufs-0.2.0.jar.sha1

Lines changed: 0 additions & 1 deletion
This file was deleted.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
24643a46886694a8d34775a6fcc12ea0f3bee974
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.transport.grpc.proto.request.common;
10+
11+
import org.opensearch.action.DocWriteRequest;
12+
import org.opensearch.protobufs.OpType;
13+
14+
/**
15+
* Utility class for converting SourceConfig Protocol Buffers to FetchSourceContext objects.
16+
* This class handles the conversion of Protocol Buffer representations to their
17+
* corresponding OpenSearch objects.
18+
*/
19+
public class OpTypeProtoUtils {
20+
21+
private OpTypeProtoUtils() {
22+
// Utility class, no instances
23+
}
24+
25+
/**
26+
*
27+
* Similar to {@link DocWriteRequest.OpType}
28+
*
29+
* @param opType
30+
* @return
31+
*/
32+
public static DocWriteRequest.OpType fromProto(OpType opType) {
33+
34+
switch (opType) {
35+
case OP_TYPE_CREATE:
36+
return DocWriteRequest.OpType.CREATE;
37+
case OP_TYPE_INDEX:
38+
return DocWriteRequest.OpType.INDEX;
39+
default:
40+
throw new UnsupportedOperationException("Invalid optype: " + opType);
41+
}
42+
}
43+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.transport.grpc.proto.request.common;
10+
11+
import org.opensearch.action.support.WriteRequest;
12+
13+
/**
14+
* Utility class for converting SourceConfig Protocol Buffers to FetchSourceContext objects.
15+
* This class handles the conversion of Protocol Buffer representations to their
16+
* corresponding OpenSearch objects.
17+
*/
18+
public class RefreshProtoUtils {
19+
20+
private RefreshProtoUtils() {
21+
// Utility class, no instances
22+
}
23+
24+
/**
25+
* Extracts the refresh policy from the bulk request.
26+
*
27+
* @param refresh The bulk request containing the refresh policy
28+
* @return The refresh policy as a string, or null if not specified
29+
*/
30+
public static String getRefreshPolicy(org.opensearch.protobufs.Refresh refresh) {
31+
switch (refresh) {
32+
case REFRESH_TRUE:
33+
return WriteRequest.RefreshPolicy.IMMEDIATE.getValue();
34+
case REFRESH_WAIT_FOR:
35+
return WriteRequest.RefreshPolicy.WAIT_UNTIL.getValue();
36+
case REFRESH_FALSE:
37+
case REFRESH_UNSPECIFIED:
38+
default:
39+
return WriteRequest.RefreshPolicy.NONE.getValue();
40+
}
41+
}
42+
}

plugins/transport-grpc/src/main/java/org/opensearch/plugin/transport/grpc/proto/request/document/bulk/ActiveShardCountProtoUtils.java

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
package org.opensearch.plugin.transport.grpc.proto.request.document.bulk;
1010

1111
import org.opensearch.action.support.ActiveShardCount;
12-
import org.opensearch.protobufs.BulkRequest;
1312
import org.opensearch.protobufs.WaitForActiveShards;
1413

1514
/**
@@ -33,37 +32,26 @@ protected ActiveShardCountProtoUtils() {
3332
* the wait_for_active_shards parameter from the Protocol Buffer request and applies
3433
* the appropriate ActiveShardCount setting to the OpenSearch bulk request.
3534
*
36-
* @param bulkRequest The OpenSearch bulk request to modify
37-
* @param request The Protocol Buffer request containing the active shard count settings
38-
* @return The modified OpenSearch bulk request with updated active shard count settings
35+
* @param waitForActiveShards The protobuf object containing the active shard count
36+
* @return The modified bulk request
3937
*/
40-
public static org.opensearch.action.bulk.BulkRequest getActiveShardCount(
41-
org.opensearch.action.bulk.BulkRequest bulkRequest,
42-
BulkRequest request
43-
) {
44-
if (!request.hasWaitForActiveShards()) {
45-
return bulkRequest;
46-
}
47-
WaitForActiveShards waitForActiveShards = request.getWaitForActiveShards();
38+
public static ActiveShardCount parseProto(WaitForActiveShards waitForActiveShards) {
39+
4840
switch (waitForActiveShards.getWaitForActiveShardsCase()) {
4941
case WaitForActiveShards.WaitForActiveShardsCase.WAIT_FOR_ACTIVE_SHARD_OPTIONS:
5042
switch (waitForActiveShards.getWaitForActiveShardOptions()) {
5143
case WAIT_FOR_ACTIVE_SHARD_OPTIONS_UNSPECIFIED:
5244
throw new UnsupportedOperationException("No mapping for WAIT_FOR_ACTIVE_SHARD_OPTIONS_UNSPECIFIED");
5345
case WAIT_FOR_ACTIVE_SHARD_OPTIONS_ALL:
54-
bulkRequest.waitForActiveShards(ActiveShardCount.ALL);
55-
break;
46+
return ActiveShardCount.ALL;
5647
default:
57-
bulkRequest.waitForActiveShards(ActiveShardCount.DEFAULT);
58-
break;
48+
return ActiveShardCount.DEFAULT;
5949
}
60-
break;
50+
// TODO fix this
6151
case WaitForActiveShards.WaitForActiveShardsCase.INT32_VALUE:
62-
bulkRequest.waitForActiveShards(waitForActiveShards.getInt32Value());
63-
break;
52+
return ActiveShardCount.from(waitForActiveShards.getInt32Value());
6453
default:
65-
throw new UnsupportedOperationException("No mapping for WAIT_FOR_ACTIVE_SHARD_OPTIONS_UNSPECIFIED");
54+
return ActiveShardCount.DEFAULT;
6655
}
67-
return bulkRequest;
6856
}
6957
}

plugins/transport-grpc/src/main/java/org/opensearch/plugin/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtils.java

Lines changed: 9 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@
2323
import org.opensearch.index.seqno.SequenceNumbers;
2424
import org.opensearch.plugin.transport.grpc.proto.request.common.FetchSourceContextProtoUtils;
2525
import org.opensearch.plugin.transport.grpc.proto.request.common.ScriptProtoUtils;
26+
import org.opensearch.plugin.transport.grpc.proto.response.document.common.VersionTypeProtoUtils;
2627
import org.opensearch.protobufs.BulkRequest;
2728
import org.opensearch.protobufs.BulkRequestBody;
2829
import org.opensearch.protobufs.CreateOperation;
2930
import org.opensearch.protobufs.DeleteOperation;
3031
import org.opensearch.protobufs.IndexOperation;
32+
import org.opensearch.protobufs.OpType;
3133
import org.opensearch.protobufs.UpdateOperation;
3234
import org.opensearch.script.Script;
3335
import org.opensearch.search.fetch.subphase.FetchSourceContext;
@@ -110,7 +112,7 @@ public static DocWriteRequest<?>[] getDocWriteRequests(
110112
String id = null;
111113
String routing = valueOrDefault(defaultRouting, request.getRouting());
112114
FetchSourceContext fetchSourceContext = defaultFetchSourceContext;
113-
IndexOperation.OpType opType = null;
115+
OpType opType = null;
114116
long version = Versions.MATCH_ANY;
115117
VersionType versionType = VersionType.INTERNAL;
116118
long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
@@ -226,17 +228,8 @@ public static IndexRequest buildCreateRequest(
226228
routing = createOperation.hasRouting() ? createOperation.getRouting() : routing;
227229
version = createOperation.hasVersion() ? createOperation.getVersion() : version;
228230
if (createOperation.hasVersionType()) {
229-
switch (createOperation.getVersionType()) {
230-
case VERSION_TYPE_EXTERNAL:
231-
versionType = VersionType.EXTERNAL;
232-
break;
233-
case VERSION_TYPE_EXTERNAL_GTE:
234-
versionType = VersionType.EXTERNAL_GTE;
235-
break;
236-
default:
237-
versionType = VersionType.INTERNAL;
238-
break;
239-
}
231+
versionType = VersionTypeProtoUtils.fromProto(createOperation.getVersionType());
232+
240233
}
241234
pipeline = createOperation.hasPipeline() ? createOperation.getPipeline() : pipeline;
242235
ifSeqNo = createOperation.hasIfSeqNo() ? createOperation.getIfSeqNo() : ifSeqNo;
@@ -276,7 +269,7 @@ public static IndexRequest buildCreateRequest(
276269
public static IndexRequest buildIndexRequest(
277270
IndexOperation indexOperation,
278271
byte[] document,
279-
IndexOperation.OpType opType,
272+
OpType opType,
280273
String index,
281274
String id,
282275
String routing,
@@ -293,17 +286,7 @@ public static IndexRequest buildIndexRequest(
293286
routing = indexOperation.hasRouting() ? indexOperation.getRouting() : routing;
294287
version = indexOperation.hasVersion() ? indexOperation.getVersion() : version;
295288
if (indexOperation.hasVersionType()) {
296-
switch (indexOperation.getVersionType()) {
297-
case VERSION_TYPE_EXTERNAL:
298-
versionType = VersionType.EXTERNAL;
299-
break;
300-
case VERSION_TYPE_EXTERNAL_GTE:
301-
versionType = VersionType.EXTERNAL_GTE;
302-
break;
303-
default:
304-
versionType = VersionType.INTERNAL;
305-
break;
306-
}
289+
versionType = VersionTypeProtoUtils.fromProto(indexOperation.getVersionType());
307290
}
308291
pipeline = indexOperation.hasPipeline() ? indexOperation.getPipeline() : pipeline;
309292
ifSeqNo = indexOperation.hasIfSeqNo() ? indexOperation.getIfSeqNo() : ifSeqNo;
@@ -326,7 +309,7 @@ public static IndexRequest buildIndexRequest(
326309
.routing(routing)
327310
.version(version)
328311
.versionType(versionType)
329-
.create(opType.equals(IndexOperation.OpType.OP_TYPE_CREATE))
312+
.create(opType.equals(OpType.OP_TYPE_CREATE))
330313
.setPipeline(pipeline)
331314
.setIfSeqNo(ifSeqNo)
332315
.setIfPrimaryTerm(ifPrimaryTerm)
@@ -487,17 +470,7 @@ public static DeleteRequest buildDeleteRequest(
487470
routing = deleteOperation.hasRouting() ? deleteOperation.getRouting() : routing;
488471
version = deleteOperation.hasVersion() ? deleteOperation.getVersion() : version;
489472
if (deleteOperation.hasVersionType()) {
490-
switch (deleteOperation.getVersionType()) {
491-
case VERSION_TYPE_EXTERNAL:
492-
versionType = VersionType.EXTERNAL;
493-
break;
494-
case VERSION_TYPE_EXTERNAL_GTE:
495-
versionType = VersionType.EXTERNAL_GTE;
496-
break;
497-
default:
498-
versionType = VersionType.INTERNAL;
499-
break;
500-
}
473+
versionType = VersionTypeProtoUtils.fromProto(deleteOperation.getVersionType());
501474
}
502475
ifSeqNo = deleteOperation.hasIfSeqNo() ? deleteOperation.getIfSeqNo() : ifSeqNo;
503476
ifPrimaryTerm = deleteOperation.hasIfPrimaryTerm() ? deleteOperation.getIfPrimaryTerm() : ifPrimaryTerm;

plugins/transport-grpc/src/main/java/org/opensearch/plugin/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtils.java

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
package org.opensearch.plugin.transport.grpc.proto.request.document.bulk;
1010

1111
import org.opensearch.action.bulk.BulkShardRequest;
12-
import org.opensearch.action.support.WriteRequest;
1312
import org.opensearch.plugin.transport.grpc.proto.request.common.FetchSourceContextProtoUtils;
13+
import org.opensearch.plugin.transport.grpc.proto.request.common.RefreshProtoUtils;
1414
import org.opensearch.protobufs.BulkRequest;
1515
import org.opensearch.rest.RestRequest;
1616
import org.opensearch.rest.action.document.RestBulkAction;
@@ -33,7 +33,7 @@ protected BulkRequestProtoUtils() {
3333

3434
/**
3535
* Prepare the request for execution.
36-
* Similar to {@link RestBulkAction#prepareRequest(RestRequest, NodeClient)} ()}
36+
* Similar to {@link RestBulkAction#prepareRequest(RestRequest, NodeClient)}
3737
* Please ensure to keep both implementations consistent.
3838
*
3939
* @param request the request to execute
@@ -47,8 +47,9 @@ public static org.opensearch.action.bulk.BulkRequest prepareRequest(BulkRequest
4747
FetchSourceContext defaultFetchSourceContext = FetchSourceContextProtoUtils.parseFromProtoRequest(request);
4848
String defaultPipeline = request.hasPipeline() ? request.getPipeline() : null;
4949

50-
bulkRequest = ActiveShardCountProtoUtils.getActiveShardCount(bulkRequest, request);
51-
50+
if (request.hasWaitForActiveShards()) {
51+
bulkRequest.waitForActiveShards(ActiveShardCountProtoUtils.parseProto(request.getWaitForActiveShards()));
52+
}
5253
Boolean defaultRequireAlias = request.hasRequireAlias() ? request.getRequireAlias() : null;
5354

5455
if (request.hasTimeout()) {
@@ -57,7 +58,7 @@ public static org.opensearch.action.bulk.BulkRequest prepareRequest(BulkRequest
5758
bulkRequest.timeout(BulkShardRequest.DEFAULT_TIMEOUT);
5859
}
5960

60-
bulkRequest.setRefreshPolicy(getRefreshPolicy(request));
61+
bulkRequest.setRefreshPolicy(RefreshProtoUtils.getRefreshPolicy(request.getRefresh()));
6162

6263
// Note: batch_size is deprecated in OS 3.x. Add batch_size parameter when backporting to OS 2.x
6364
/*
@@ -80,26 +81,4 @@ public static org.opensearch.action.bulk.BulkRequest prepareRequest(BulkRequest
8081

8182
return bulkRequest;
8283
}
83-
84-
/**
85-
* Extracts the refresh policy from the bulk request.
86-
*
87-
* @param request The bulk request containing the refresh policy
88-
* @return The refresh policy as a string, or null if not specified
89-
*/
90-
public static String getRefreshPolicy(org.opensearch.protobufs.BulkRequest request) {
91-
if (!request.hasRefresh()) {
92-
return null;
93-
}
94-
switch (request.getRefresh()) {
95-
case REFRESH_TRUE:
96-
return WriteRequest.RefreshPolicy.IMMEDIATE.getValue();
97-
case REFRESH_WAIT_FOR:
98-
return WriteRequest.RefreshPolicy.WAIT_UNTIL.getValue();
99-
case REFRESH_FALSE:
100-
case REFRESH_UNSPECIFIED:
101-
default:
102-
return WriteRequest.RefreshPolicy.NONE.getValue();
103-
}
104-
}
10584
}

plugins/transport-grpc/src/main/java/org/opensearch/plugin/transport/grpc/proto/request/search/query/AbstractQueryBuilderProtoUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ public static QueryBuilder parseInnerQueryBuilderProto(QueryContainer queryConta
4040
result = MatchNoneQueryBuilderProtoUtils.fromProto(queryContainer.getMatchNone());
4141
} else if (queryContainer.getTermCount() > 0) {
4242
result = TermQueryBuilderProtoUtils.fromProto(queryContainer.getTermMap());
43+
} else if (queryContainer.hasTerms()) {
44+
result = TermsQueryBuilderProtoUtils.fromProto(queryContainer.getTerms());
4345
}
4446
// TODO add more query types
4547
else {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
package org.opensearch.plugin.transport.grpc.proto.request.search.query;
9+
10+
import org.opensearch.core.xcontent.XContentParser;
11+
import org.opensearch.indices.TermsLookup;
12+
import org.opensearch.protobufs.TermsLookupField;
13+
14+
/**
15+
* Utility class for converting TermQuery Protocol Buffers to OpenSearch objects.
16+
* This class provides methods to transform Protocol Buffer representations of term queries
17+
* into their corresponding OpenSearch TermQueryBuilder implementations for search operations.
18+
*/
19+
public class TermsLookupProtoUtils {
20+
21+
private TermsLookupProtoUtils() {
22+
// Utility class, no instances
23+
}
24+
25+
/**
26+
* Converts a Protocol Buffer TermQuery map to an OpenSearch TermQueryBuilder.
27+
* Similar to {@link TermsLookup#parseTermsLookup(XContentParser)}
28+
*
29+
* @param termsLookupFieldProto The map of field names to Protocol Buffer TermQuery objects
30+
* @return A configured TermsLookup instance
31+
*/
32+
protected static TermsLookup parseTermsLookup(TermsLookupField termsLookupFieldProto) {
33+
34+
String index = termsLookupFieldProto.getIndex();
35+
String id = termsLookupFieldProto.getId();
36+
String path = termsLookupFieldProto.getPath();
37+
38+
TermsLookup termsLookup = new TermsLookup(index, id, path);
39+
40+
if (termsLookupFieldProto.hasRouting()) {
41+
termsLookup.routing(termsLookupFieldProto.getRouting());
42+
}
43+
44+
if (termsLookupFieldProto.hasStore()) {
45+
termsLookup.store(termsLookupFieldProto.getStore());
46+
}
47+
return termsLookup;
48+
}
49+
}

0 commit comments

Comments
 (0)