Skip to content

Commit 5e99cf6

Browse files
handle drop error policy to avoid failing entire polled batch
Signed-off-by: Varun Bharadwaj <[email protected]>
1 parent 57ae526 commit 5e99cf6

File tree

3 files changed

+14
-12
lines changed

3 files changed

+14
-12
lines changed

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,20 +139,20 @@ public void testCloseIndex() throws Exception {
139139
}
140140

141141
public void testKafkaIngestionWithMultipleProcessorThreads() throws Exception {
142-
for (int i = 1; i <= 100; i++) {
142+
for (int i = 1; i <= 2000; i++) {
143143
produceData(String.valueOf(i), "name" + i, "25");
144144
}
145145
createIndexWithDefaultSettings(indexName, 1, 0, 3);
146146
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(21);
147147
waitForState(() -> {
148148
refresh(indexName);
149149
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
150-
assertThat(response.getHits().getTotalHits().value(), is(100L));
150+
assertThat(response.getHits().getTotalHits().value(), is(2000L));
151151
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
152152
.getPollingIngestStats();
153153
assertNotNull(stats);
154-
assertThat(stats.getMessageProcessorStats().getTotalProcessedCount(), is(100L));
155-
assertThat(stats.getConsumerStats().getTotalPolledCount(), is(100L));
154+
assertThat(stats.getMessageProcessorStats().getTotalProcessedCount(), is(2000L));
155+
assertThat(stats.getConsumerStats().getTotalPolledCount(), is(2000L));
156156
return true;
157157
});
158158
}

server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,8 @@ protected void startPoll() {
144144
}
145145
logger.info("Starting poller for shard {}", consumer.getShardId());
146146

147+
IngestionShardPointer lastProcessedPointer = null;
148+
boolean encounteredError = false;
147149
while (true) {
148150
try {
149151
if (closed) {
@@ -190,29 +192,27 @@ protected void startPoll() {
190192
}
191193

192194
state = State.POLLING;
193-
194195
List<IngestionShardConsumer.ReadResult<? extends IngestionShardPointer, ? extends Message>> results;
195196

196-
if (includeBatchStartPointer) {
197+
// todo: handle multi-writer scenarios to provide atleast once semantics
198+
if (encounteredError && lastProcessedPointer != null) {
199+
results = consumer.readNext(lastProcessedPointer, false, MAX_POLL_SIZE, POLL_TIMEOUT);
200+
} else if (includeBatchStartPointer) {
197201
results = consumer.readNext(batchStartPointer, true, MAX_POLL_SIZE, POLL_TIMEOUT);
198202
} else {
199203
results = consumer.readNext(MAX_POLL_SIZE, POLL_TIMEOUT);
200204
}
201205

206+
encounteredError = false;
202207
if (results.isEmpty()) {
203208
// no new records
204209
continue;
205210
}
206211

207212
state = State.PROCESSING;
208213
// process the records
209-
boolean firstInBatch = true;
210214
for (IngestionShardConsumer.ReadResult<? extends IngestionShardPointer, ? extends Message> result : results) {
211-
if (firstInBatch) {
212-
// update the batch start pointer to the next batch
213-
batchStartPointer = result.getPointer();
214-
firstInBatch = false;
215-
}
215+
lastProcessedPointer = result.getPointer();
216216

217217
// check if the message is already processed
218218
if (isProcessed(result.getPointer())) {
@@ -231,6 +231,7 @@ protected void startPoll() {
231231
// for future reads, we do not need to include the batch start pointer, and read from the last successful pointer.
232232
includeBatchStartPointer = false;
233233
} catch (Throwable e) {
234+
encounteredError = true;
234235
logger.error("Error in polling the shard {}: {}", consumer.getShardId(), e);
235236
errorStrategy.handleError(e, IngestionErrorStrategy.ErrorStage.POLLING);
236237

server/src/main/java/org/opensearch/indices/pollingingest/PartitionedBlockingQueueContainer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ public void add(IngestionShardConsumer.ReadResult<? extends IngestionShardPointe
130130
id = (String) payloadMap.get(ID);
131131
} else {
132132
id = IngestionUtils.generateID();
133+
payloadMap.put(ID, id);
133134
autoGeneratedIdTimestamp = System.currentTimeMillis();
134135
}
135136

0 commit comments

Comments
 (0)