Skip to content

Commit c23f437

Browse files
Add tests and disable multi-thread writes till feature is complete
Signed-off-by: Varun Bharadwaj <[email protected]>
1 parent 5e99cf6 commit c23f437

File tree

4 files changed

+36
-22
lines changed

4 files changed

+36
-22
lines changed

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -138,30 +138,11 @@ public void testCloseIndex() throws Exception {
138138
client().admin().indices().close(Requests.closeIndexRequest(indexName)).get();
139139
}
140140

141-
public void testKafkaIngestionWithMultipleProcessorThreads() throws Exception {
142-
for (int i = 1; i <= 2000; i++) {
143-
produceData(String.valueOf(i), "name" + i, "25");
144-
}
145-
createIndexWithDefaultSettings(indexName, 1, 0, 3);
146-
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(21);
147-
waitForState(() -> {
148-
refresh(indexName);
149-
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
150-
assertThat(response.getHits().getTotalHits().value(), is(2000L));
151-
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
152-
.getPollingIngestStats();
153-
assertNotNull(stats);
154-
assertThat(stats.getMessageProcessorStats().getTotalProcessedCount(), is(2000L));
155-
assertThat(stats.getConsumerStats().getTotalPolledCount(), is(2000L));
156-
return true;
157-
});
158-
}
159-
160141
public void testUpdateAndDelete() throws Exception {
161142
// Step 1: Produce message and wait for it to be searchable
162143

163144
produceData("1", "name", "25", defaultMessageTimestamp, "index");
164-
createIndexWithDefaultSettings(indexName, 1, 0, 3);
145+
createIndexWithDefaultSettings(1, 0);
165146
ensureGreen(indexName);
166147
waitForState(() -> {
167148
BoolQueryBuilder query = new BoolQueryBuilder().must(new TermQueryBuilder("_id", "1"));

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -787,13 +787,15 @@ public Iterator<Setting<?>> settings() {
787787
);
788788

789789
/**
790-
* Defines the number of processor threads that will write to the lucene index.
790+
* Defines the number of processor threads that will write to the lucene index. This setting is currently disabled
791+
* and will be allowed once the feature is ready. A default value of 1 will be used.
791792
*/
792793
public static final String SETTING_INGESTION_SOURCE_NUM_PROCESSOR_THREADS = "index.ingestion_source.num_processor_threads";
793794
public static final Setting<Integer> INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING = Setting.intSetting(
794795
SETTING_INGESTION_SOURCE_NUM_PROCESSOR_THREADS,
795796
1,
796797
1,
798+
1,
797799
Setting.Property.IndexScope,
798800
Setting.Property.Final
799801
);

server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ protected void startPoll() {
232232
includeBatchStartPointer = false;
233233
} catch (Throwable e) {
234234
encounteredError = true;
235-
logger.error("Error in polling the shard {}: {}", consumer.getShardId(), e);
235+
logger.error("Error in polling the shard {}, lastProcessedPointer {}: {}", consumer.getShardId(), lastProcessedPointer, e);
236236
errorStrategy.handleError(e, IngestionErrorStrategy.ErrorStage.POLLING);
237237

238238
if (!errorStrategy.shouldIgnoreError(e, IngestionErrorStrategy.ErrorStage.POLLING)) {

server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,37 @@ public void testProcessingErrorWithBlockErrorIngestionStrategy() throws TimeoutE
372372
assertEquals(DefaultStreamPoller.State.POLLING, poller.getState());
373373
}
374374

375+
public void testInitialConsumerReadTransientError() throws TimeoutException, InterruptedException {
376+
messages.add("{\"_id\":\"3\",\"_source\":{\"name\":\"bob\", \"age\": 24}}".getBytes(StandardCharsets.UTF_8));
377+
messages.add("{\"_id\":\"4\",\"_source\":{\"name\":\"alice\", \"age\": 21}}".getBytes(StandardCharsets.UTF_8));
378+
379+
DropIngestionErrorStrategy mockErrorStrategy = spy(new DropIngestionErrorStrategy("ingestion_source"));
380+
processorRunnable = new MessageProcessorRunnable(new ArrayBlockingQueue<>(5), processor, mockErrorStrategy);
381+
PartitionedBlockingQueueContainer blockingQueueContainer = new PartitionedBlockingQueueContainer(processorRunnable, 0);
382+
FakeIngestionSource.FakeIngestionConsumer consumerSpy = spy(fakeConsumer);
383+
384+
// fail consumer's first poll attempt
385+
doThrow(new RuntimeException("failed to poll messages")).doCallRealMethod()
386+
.when(consumerSpy)
387+
.readNext(any(), anyBoolean(), anyLong(), anyInt());
388+
389+
poller = new DefaultStreamPoller(
390+
new FakeIngestionSource.FakeIngestionShardPointer(0),
391+
persistedPointers,
392+
consumerSpy,
393+
blockingQueueContainer,
394+
StreamPoller.ResetState.NONE,
395+
"",
396+
mockErrorStrategy,
397+
StreamPoller.State.NONE
398+
);
399+
poller.start();
400+
Thread.sleep(sleepTime);
401+
402+
verify(mockErrorStrategy, times(1)).handleError(any(), eq(IngestionErrorStrategy.ErrorStage.POLLING));
403+
assertEquals(4, blockingQueueContainer.getTotalProcessedCount());
404+
}
405+
375406
public void testUpdateErrorStrategy() {
376407
assertTrue(poller.getErrorStrategy() instanceof DropIngestionErrorStrategy);
377408
assertTrue(processorRunnable.getErrorStrategy() instanceof DropIngestionErrorStrategy);

0 commit comments

Comments
 (0)