Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add SearchService and Search GRPC endpoint ([#17830](https://github.com/opensearch-project/OpenSearch/pull/17830))
- Add update and delete support in pull-based ingestion ([#17822](https://github.com/opensearch-project/OpenSearch/pull/17822))
- Allow maxPollSize and pollTimeout in IngestionSource to be configurable ([#17863](https://github.com/opensearch-project/OpenSearch/pull/17863))
- Add TermsQuery support to Search GRPC endpoint ([#17888](https://github.com/opensearch-project/OpenSearch/pull/17888))

### Changed
- Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912))
Expand All @@ -43,6 +44,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Introduce 512 byte limit to search and ingest pipeline IDs ([#17786](https://github.com/opensearch-project/OpenSearch/pull/17786))
- Avoid skewed segment replication lag metric ([#17831](https://github.com/opensearch-project/OpenSearch/pull/17831))
- Increase the default segment counter step size when replica promoting ([#17568](https://github.com/opensearch-project/OpenSearch/pull/17568))
- Increase the default segment counter step size when replica promoting ([#17568](https://github.com/opensearch-project/OpenSearch/pull/17568))

### Dependencies
- Bump `com.nimbusds:nimbus-jose-jwt` from 9.41.1 to 10.0.2 ([#17607](https://github.com/opensearch-project/OpenSearch/pull/17607), [#17669](https://github.com/opensearch-project/OpenSearch/pull/17669))
Expand All @@ -64,6 +66,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `com.google.api.grpc:proto-google-iam-v1` from 1.33.0 to 1.49.1 ([#17811](https://github.com/opensearch-project/OpenSearch/pull/17811))
- Bump `com.azure:azure-core` from 1.54.1 to 1.55.3 ([#17810](https://github.com/opensearch-project/OpenSearch/pull/17810))
- Bump `org.apache.poi` version from 5.2.5 to 5.4.1 in /plugins/ingest-attachment ([#17887](https://github.com/opensearch-project/OpenSearch/pull/17887))
- Bump `org.opensearch:protobufs` from 0.2.0 to 0.3.0 ([#17888](https://github.com/opensearch-project/OpenSearch/pull/17888))

### Changed

Expand Down
2 changes: 1 addition & 1 deletion plugins/transport-grpc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ dependencies {
implementation "io.grpc:grpc-stub:${versions.grpc}"
implementation "io.grpc:grpc-util:${versions.grpc}"
implementation "io.perfmark:perfmark-api:0.26.0"
implementation "org.opensearch:protobufs:0.2.0"
implementation "org.opensearch:protobufs:0.3.0"
testImplementation project(':test:framework')
}

Expand Down
1 change: 0 additions & 1 deletion plugins/transport-grpc/licenses/protobufs-0.2.0.jar.sha1

This file was deleted.

1 change: 1 addition & 0 deletions plugins/transport-grpc/licenses/protobufs-0.3.0.jar.sha1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
5e22ed37e4535c9c9cfeb8993f5294ba1201795c
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
import java.util.Map;

/**
* Utility class for converting ObjectMap Protobuf type to a Java object.
* Utility class for converting ObjectMap Protocol Buffer types to standard Java objects.
* This class provides methods to transform Protocol Buffer representations of object maps
* into their corresponding Java Map, List, and primitive type equivalents.
*/
public class ObjectMapProtoUtils {

Expand All @@ -25,11 +27,12 @@ private ObjectMapProtoUtils() {
}

/**
* Converts a ObjectMap to Java POJO representation.
* Similar to {@link XContentParser#map()}
* Converts a Protocol Buffer ObjectMap to a Java Map representation.
* Similar to {@link XContentParser#map()}, this method transforms the structured
* Protocol Buffer data into a standard Java Map with appropriate value types.
*
* @param objectMap The generic protobuf objectMap to convert
* @return A Protobuf builder .google.protobuf.Struct representation
* @param objectMap The Protocol Buffer ObjectMap to convert
* @return A Java Map containing the key-value pairs from the Protocol Buffer ObjectMap
*/
public static Map<String, Object> fromProto(ObjectMap objectMap) {

Expand All @@ -43,11 +46,14 @@ public static Map<String, Object> fromProto(ObjectMap objectMap) {
}

/**
* Converts a ObjectMap.Value to Java POJO representation.
* Similar to {@link XContentParser#map()}
* Converts a Protocol Buffer ObjectMap.Value to an appropriate Java object representation.
* This method handles various value types (numbers, strings, booleans, lists, nested maps)
* and converts them to their Java equivalents.
*
* @param value The generic protobuf ObjectMap.Value to convert
* @return A Protobuf builder .google.protobuf.Struct representation
* @param value The Protocol Buffer ObjectMap.Value to convert
* @return A Java object representing the value (could be a primitive type, String, List, or Map)
* @throws UnsupportedOperationException if the value is null, which cannot be added to a Java map
* @throws IllegalArgumentException if the value type cannot be converted
*/
public static Object fromProto(ObjectMap.Value value) {
if (value.hasNullValue()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.transport.grpc.proto.request.common;

import org.opensearch.action.DocWriteRequest;
import org.opensearch.protobufs.OpType;

/**
* Utility class for converting SourceConfig Protocol Buffers to FetchSourceContext objects.
* This class handles the conversion of Protocol Buffer representations to their
* corresponding OpenSearch objects.
*/
public class OpTypeProtoUtils {

private OpTypeProtoUtils() {
// Utility class, no instances
}

/**
*
* Similar to {@link DocWriteRequest.OpType}
*
* @param opType
* @return
*/
public static DocWriteRequest.OpType fromProto(OpType opType) {

switch (opType) {
case OP_TYPE_CREATE:
return DocWriteRequest.OpType.CREATE;

Check warning on line 36 in plugins/transport-grpc/src/main/java/org/opensearch/plugin/transport/grpc/proto/request/common/OpTypeProtoUtils.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-grpc/src/main/java/org/opensearch/plugin/transport/grpc/proto/request/common/OpTypeProtoUtils.java#L36

Added line #L36 was not covered by tests
case OP_TYPE_INDEX:
return DocWriteRequest.OpType.INDEX;

Check warning on line 38 in plugins/transport-grpc/src/main/java/org/opensearch/plugin/transport/grpc/proto/request/common/OpTypeProtoUtils.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-grpc/src/main/java/org/opensearch/plugin/transport/grpc/proto/request/common/OpTypeProtoUtils.java#L38

Added line #L38 was not covered by tests
default:
throw new UnsupportedOperationException("Invalid optype: " + opType);

Check warning on line 40 in plugins/transport-grpc/src/main/java/org/opensearch/plugin/transport/grpc/proto/request/common/OpTypeProtoUtils.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-grpc/src/main/java/org/opensearch/plugin/transport/grpc/proto/request/common/OpTypeProtoUtils.java#L40

Added line #L40 was not covered by tests
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.transport.grpc.proto.request.common;

import org.opensearch.action.support.WriteRequest;

/**
* Utility class for converting SourceConfig Protocol Buffers to FetchSourceContext objects.
* This class handles the conversion of Protocol Buffer representations to their
* corresponding OpenSearch objects.
*/
public class RefreshProtoUtils {

private RefreshProtoUtils() {
// Utility class, no instances
}

/**
* Extracts the refresh policy from the bulk request.
*
* @param refresh The bulk request containing the refresh policy
* @return The refresh policy as a string, or null if not specified
*/
public static String getRefreshPolicy(org.opensearch.protobufs.Refresh refresh) {
switch (refresh) {
case REFRESH_TRUE:
return WriteRequest.RefreshPolicy.IMMEDIATE.getValue();
case REFRESH_WAIT_FOR:
return WriteRequest.RefreshPolicy.WAIT_UNTIL.getValue();
case REFRESH_FALSE:
case REFRESH_UNSPECIFIED:
default:
return WriteRequest.RefreshPolicy.NONE.getValue();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.opensearch.plugin.transport.grpc.proto.request.document.bulk;

import org.opensearch.action.support.ActiveShardCount;
import org.opensearch.protobufs.BulkRequest;
import org.opensearch.protobufs.WaitForActiveShards;

/**
Expand All @@ -33,37 +32,25 @@ protected ActiveShardCountProtoUtils() {
* the wait_for_active_shards parameter from the Protocol Buffer request and applies
* the appropriate ActiveShardCount setting to the OpenSearch bulk request.
*
* @param bulkRequest The OpenSearch bulk request to modify
* @param request The Protocol Buffer request containing the active shard count settings
* @return The modified OpenSearch bulk request with updated active shard count settings
* @param waitForActiveShards The protobuf object containing the active shard count
* @return The modified bulk request
*/
public static org.opensearch.action.bulk.BulkRequest getActiveShardCount(
org.opensearch.action.bulk.BulkRequest bulkRequest,
BulkRequest request
) {
if (!request.hasWaitForActiveShards()) {
return bulkRequest;
}
WaitForActiveShards waitForActiveShards = request.getWaitForActiveShards();
public static ActiveShardCount parseProto(WaitForActiveShards waitForActiveShards) {

switch (waitForActiveShards.getWaitForActiveShardsCase()) {
case WaitForActiveShards.WaitForActiveShardsCase.WAIT_FOR_ACTIVE_SHARD_OPTIONS:
switch (waitForActiveShards.getWaitForActiveShardOptions()) {
case WAIT_FOR_ACTIVE_SHARD_OPTIONS_UNSPECIFIED:
throw new UnsupportedOperationException("No mapping for WAIT_FOR_ACTIVE_SHARD_OPTIONS_UNSPECIFIED");
case WAIT_FOR_ACTIVE_SHARD_OPTIONS_ALL:
bulkRequest.waitForActiveShards(ActiveShardCount.ALL);
break;
return ActiveShardCount.ALL;
default:
bulkRequest.waitForActiveShards(ActiveShardCount.DEFAULT);
break;
return ActiveShardCount.DEFAULT;
}
break;
case WaitForActiveShards.WaitForActiveShardsCase.INT32_VALUE:
bulkRequest.waitForActiveShards(waitForActiveShards.getInt32Value());
break;
return ActiveShardCount.from(waitForActiveShards.getInt32Value());
default:
throw new UnsupportedOperationException("No mapping for WAIT_FOR_ACTIVE_SHARD_OPTIONS_UNSPECIFIED");
return ActiveShardCount.DEFAULT;
}
return bulkRequest;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.plugin.transport.grpc.proto.request.common.FetchSourceContextProtoUtils;
import org.opensearch.plugin.transport.grpc.proto.request.common.ScriptProtoUtils;
import org.opensearch.plugin.transport.grpc.proto.response.document.common.VersionTypeProtoUtils;
import org.opensearch.protobufs.BulkRequest;
import org.opensearch.protobufs.BulkRequestBody;
import org.opensearch.protobufs.CreateOperation;
import org.opensearch.protobufs.DeleteOperation;
import org.opensearch.protobufs.IndexOperation;
import org.opensearch.protobufs.OpType;
import org.opensearch.protobufs.UpdateOperation;
import org.opensearch.script.Script;
import org.opensearch.search.fetch.subphase.FetchSourceContext;
Expand Down Expand Up @@ -110,7 +112,7 @@ public static DocWriteRequest<?>[] getDocWriteRequests(
String id = null;
String routing = valueOrDefault(defaultRouting, request.getRouting());
FetchSourceContext fetchSourceContext = defaultFetchSourceContext;
IndexOperation.OpType opType = null;
OpType opType = null;
long version = Versions.MATCH_ANY;
VersionType versionType = VersionType.INTERNAL;
long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
Expand Down Expand Up @@ -226,17 +228,8 @@ public static IndexRequest buildCreateRequest(
routing = createOperation.hasRouting() ? createOperation.getRouting() : routing;
version = createOperation.hasVersion() ? createOperation.getVersion() : version;
if (createOperation.hasVersionType()) {
switch (createOperation.getVersionType()) {
case VERSION_TYPE_EXTERNAL:
versionType = VersionType.EXTERNAL;
break;
case VERSION_TYPE_EXTERNAL_GTE:
versionType = VersionType.EXTERNAL_GTE;
break;
default:
versionType = VersionType.INTERNAL;
break;
}
versionType = VersionTypeProtoUtils.fromProto(createOperation.getVersionType());

}
pipeline = createOperation.hasPipeline() ? createOperation.getPipeline() : pipeline;
ifSeqNo = createOperation.hasIfSeqNo() ? createOperation.getIfSeqNo() : ifSeqNo;
Expand Down Expand Up @@ -276,7 +269,7 @@ public static IndexRequest buildCreateRequest(
public static IndexRequest buildIndexRequest(
IndexOperation indexOperation,
byte[] document,
IndexOperation.OpType opType,
OpType opType,
String index,
String id,
String routing,
Expand All @@ -293,17 +286,7 @@ public static IndexRequest buildIndexRequest(
routing = indexOperation.hasRouting() ? indexOperation.getRouting() : routing;
version = indexOperation.hasVersion() ? indexOperation.getVersion() : version;
if (indexOperation.hasVersionType()) {
switch (indexOperation.getVersionType()) {
case VERSION_TYPE_EXTERNAL:
versionType = VersionType.EXTERNAL;
break;
case VERSION_TYPE_EXTERNAL_GTE:
versionType = VersionType.EXTERNAL_GTE;
break;
default:
versionType = VersionType.INTERNAL;
break;
}
versionType = VersionTypeProtoUtils.fromProto(indexOperation.getVersionType());
}
pipeline = indexOperation.hasPipeline() ? indexOperation.getPipeline() : pipeline;
ifSeqNo = indexOperation.hasIfSeqNo() ? indexOperation.getIfSeqNo() : ifSeqNo;
Expand All @@ -326,7 +309,7 @@ public static IndexRequest buildIndexRequest(
.routing(routing)
.version(version)
.versionType(versionType)
.create(opType.equals(IndexOperation.OpType.OP_TYPE_CREATE))
.create(opType.equals(OpType.OP_TYPE_CREATE))
.setPipeline(pipeline)
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
Expand Down Expand Up @@ -487,17 +470,7 @@ public static DeleteRequest buildDeleteRequest(
routing = deleteOperation.hasRouting() ? deleteOperation.getRouting() : routing;
version = deleteOperation.hasVersion() ? deleteOperation.getVersion() : version;
if (deleteOperation.hasVersionType()) {
switch (deleteOperation.getVersionType()) {
case VERSION_TYPE_EXTERNAL:
versionType = VersionType.EXTERNAL;
break;
case VERSION_TYPE_EXTERNAL_GTE:
versionType = VersionType.EXTERNAL_GTE;
break;
default:
versionType = VersionType.INTERNAL;
break;
}
versionType = VersionTypeProtoUtils.fromProto(deleteOperation.getVersionType());
}
ifSeqNo = deleteOperation.hasIfSeqNo() ? deleteOperation.getIfSeqNo() : ifSeqNo;
ifPrimaryTerm = deleteOperation.hasIfPrimaryTerm() ? deleteOperation.getIfPrimaryTerm() : ifPrimaryTerm;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
package org.opensearch.plugin.transport.grpc.proto.request.document.bulk;

import org.opensearch.action.bulk.BulkShardRequest;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.plugin.transport.grpc.proto.request.common.FetchSourceContextProtoUtils;
import org.opensearch.plugin.transport.grpc.proto.request.common.RefreshProtoUtils;
import org.opensearch.protobufs.BulkRequest;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.document.RestBulkAction;
Expand All @@ -33,7 +33,7 @@

/**
* Prepare the request for execution.
* Similar to {@link RestBulkAction#prepareRequest(RestRequest, NodeClient)} ()}
* Similar to {@link RestBulkAction#prepareRequest(RestRequest, NodeClient)}
* Please ensure to keep both implementations consistent.
*
* @param request the request to execute
Expand All @@ -47,8 +47,9 @@
FetchSourceContext defaultFetchSourceContext = FetchSourceContextProtoUtils.parseFromProtoRequest(request);
String defaultPipeline = request.hasPipeline() ? request.getPipeline() : null;

bulkRequest = ActiveShardCountProtoUtils.getActiveShardCount(bulkRequest, request);

if (request.hasWaitForActiveShards()) {
bulkRequest.waitForActiveShards(ActiveShardCountProtoUtils.parseProto(request.getWaitForActiveShards()));

Check warning on line 51 in plugins/transport-grpc/src/main/java/org/opensearch/plugin/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtils.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-grpc/src/main/java/org/opensearch/plugin/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtils.java#L51

Added line #L51 was not covered by tests
}
Boolean defaultRequireAlias = request.hasRequireAlias() ? request.getRequireAlias() : null;

if (request.hasTimeout()) {
Expand All @@ -57,7 +58,7 @@
bulkRequest.timeout(BulkShardRequest.DEFAULT_TIMEOUT);
}

bulkRequest.setRefreshPolicy(getRefreshPolicy(request));
bulkRequest.setRefreshPolicy(RefreshProtoUtils.getRefreshPolicy(request.getRefresh()));

// Note: batch_size is deprecated in OS 3.x. Add batch_size parameter when backporting to OS 2.x
/*
Expand All @@ -80,26 +81,4 @@

return bulkRequest;
}

/**
* Extracts the refresh policy from the bulk request.
*
* @param request The bulk request containing the refresh policy
* @return The refresh policy as a string, or null if not specified
*/
public static String getRefreshPolicy(org.opensearch.protobufs.BulkRequest request) {
if (!request.hasRefresh()) {
return null;
}
switch (request.getRefresh()) {
case REFRESH_TRUE:
return WriteRequest.RefreshPolicy.IMMEDIATE.getValue();
case REFRESH_WAIT_FOR:
return WriteRequest.RefreshPolicy.WAIT_UNTIL.getValue();
case REFRESH_FALSE:
case REFRESH_UNSPECIFIED:
default:
return WriteRequest.RefreshPolicy.NONE.getValue();
}
}
}
Loading
Loading