diff --git a/CHANGELOG.md b/CHANGELOG.md index 875e912f9ea19..77c73bd1cc4c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Added scale to zero (`search_only` mode) support for OpenSearch reader writer separation ([#17299](https://github.com/opensearch-project/OpenSearch/pull/17299) - [Star Tree] [Search] Resolving numeric range aggregation with metric aggregation using star-tree ([#17273](https://github.com/opensearch-project/OpenSearch/pull/17273)) - Added Search Only strict routing setting ([#17803](https://github.com/opensearch-project/OpenSearch/pull/17803)) +- Disable the index API for ingestion engine ([#17768](https://github.com/opensearch-project/OpenSearch/pull/17768)) ### Changed - Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912)) diff --git a/server/src/main/java/org/opensearch/OpenSearchServerException.java b/server/src/main/java/org/opensearch/OpenSearchServerException.java index 695fe4dbac767..247a23dc4bd57 100644 --- a/server/src/main/java/org/opensearch/OpenSearchServerException.java +++ b/server/src/main/java/org/opensearch/OpenSearchServerException.java @@ -1224,5 +1224,13 @@ public static void registerExceptions() { V_3_0_0 ) ); + registerExceptionHandle( + new OpenSearchExceptionHandle( + org.opensearch.index.engine.IngestionEngineException.class, + org.opensearch.index.engine.IngestionEngineException::new, + 176, + V_3_0_0 + ) + ); } } diff --git a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java index 1d5d104394558..4839b9ceb463b 100644 --- a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java @@ -145,6 +145,16 @@ protected Set fetchPersistedOffsets(DirectoryReader direc @Override public IndexResult index(Index index) throws IOException { + throw new IngestionEngineException("push-based indexing is not supported in ingestion engine, use streaming source instead"); + } + + /** + * Indexes the document into the engine. This is used internally by the stream poller only. + * @param index the index request + * @return the index result + * @throws IOException if an error occurs + */ + public IndexResult indexInternal(Index index) throws IOException { assert Objects.equals(index.uid().field(), IdFieldMapper.NAME) : index.uid().field(); ensureOpen(); final IndexResult indexResult; @@ -168,7 +178,7 @@ private void addDocs(final List docs, final IndexWriter i @Override public DeleteResult delete(Delete delete) throws IOException { - return null; + throw new IngestionEngineException("push-based deletion is not supported in ingestion engine, use streaming source instead"); } @Override diff --git a/server/src/main/java/org/opensearch/index/engine/IngestionEngineException.java b/server/src/main/java/org/opensearch/index/engine/IngestionEngineException.java new file mode 100644 index 0000000000000..9bf53d35bacf6 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/IngestionEngineException.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine; + +import org.opensearch.OpenSearchException; +import org.opensearch.OpenSearchWrapperException; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.rest.RestStatus; + +import java.io.IOException; + +/** + * Exception thrown when there is an error in the ingestion engine. + * + * @opensearch.internal + */ +public class IngestionEngineException extends OpenSearchException implements OpenSearchWrapperException { + public IngestionEngineException(String message) { + super(message); + } + + public IngestionEngineException(StreamInput in) throws IOException { + super(in); + } + + @Override + public RestStatus status() { + return RestStatus.BAD_REQUEST; + } +} diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java b/server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java index 2066f348243b8..23aa1a043d774 100644 --- a/server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java +++ b/server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java @@ -119,7 +119,7 @@ protected void process(Message message, IngestionShardPointer pointer) { Engine.Operation operation = getOperation(payload, pointer); switch (operation.operationType()) { case INDEX: - engine.index((Engine.Index) operation); + engine.indexInternal((Engine.Index) operation); break; case DELETE: engine.delete((Engine.Delete) operation); diff --git a/server/src/test/java/org/opensearch/ExceptionSerializationTests.java b/server/src/test/java/org/opensearch/ExceptionSerializationTests.java index dd55abb65d19f..9773a0dcd16a0 100644 --- a/server/src/test/java/org/opensearch/ExceptionSerializationTests.java +++ b/server/src/test/java/org/opensearch/ExceptionSerializationTests.java @@ -87,6 +87,7 @@ import org.opensearch.core.xcontent.XContentLocation; import org.opensearch.crypto.CryptoRegistryException; import org.opensearch.env.ShardLockObtainFailedException; +import org.opensearch.index.engine.IngestionEngineException; import org.opensearch.index.engine.RecoveryEngineException; import org.opensearch.index.query.QueryShardException; import org.opensearch.index.seqno.RetentionLeaseAlreadyExistsException; @@ -896,6 +897,7 @@ public void testIds() { ids.put(173, ViewAlreadyExistsException.class); ids.put(174, InvalidIndexContextException.class); ids.put(175, ResponseLimitBreachedException.class); + ids.put(176, IngestionEngineException.class); ids.put(10001, IndexCreateBlockException.class); Map, Integer> reverse = new HashMap<>(); diff --git a/server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java b/server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java index 8f84f59cfbccc..d8c5ebb16a36a 100644 --- a/server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java @@ -35,6 +35,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import org.mockito.Mockito; + import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doThrow; @@ -128,6 +130,13 @@ public void testRecovery() throws IOException { waitForResults(ingestionEngine, 4); } + public void testPushAPIFailures() { + Engine.Index indexMock = Mockito.mock(Engine.Index.class); + assertThrows(IngestionEngineException.class, () -> ingestionEngine.index(indexMock)); + Engine.Delete deleteMock = Mockito.mock(Engine.Delete.class); + assertThrows(IngestionEngineException.class, () -> ingestionEngine.delete(deleteMock)); + } + public void testCreationFailure() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); FakeIngestionSource.FakeIngestionConsumerFactory consumerFactory = new FakeIngestionSource.FakeIngestionConsumerFactory(messages);