Skip to content

Commit 883b5a8

Browse files
update ingestion status in index metadata xcontent and avoid retry for parsing error
Signed-off-by: Varun Bharadwaj <[email protected]>
1 parent 833d9d2 commit 883b5a8

File tree

12 files changed

+223
-32
lines changed

12 files changed

+223
-32
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
5858
- Fix Using an excessively large reindex slice can lead to a JVM OutOfMemoryError on coordinator.([#18964](https://github.com/opensearch-project/OpenSearch/pull/18964))
5959
- [Flaky Test] Fix flaky test in SecureReactorNetty4HttpServerTransportTests with reproducible seed ([#19327](https://github.com/opensearch-project/OpenSearch/pull/19327))
6060
- Remove unnecessary looping in field data cache clear ([#19116](https://github.com/opensearch-project/OpenSearch/pull/19116))
61-
61+
- Fix ingestion state xcontent serialization in IndexMetadata and fail fast on mapping errors([#19320](https://github.com/opensearch-project/OpenSearch/pull/19320))
6262

6363
### Dependencies
6464
- Bump `com.netflix.nebula.ospackage-base` from 12.0.0 to 12.1.0 ([#19019](https://github.com/opensearch-project/OpenSearch/pull/19019))

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -909,7 +909,8 @@ public Iterator<Setting<?>> settings() {
909909

910910
/**
911911
* Defines if all-active pull-based ingestion is enabled. In this mode, replicas will directly consume from the
912-
* streaming source and process the updates. This mode is currently not supported along with segment replication.
912+
* streaming source and process the updates. In the default document replication mode, this setting must be enabled.
913+
* This mode is currently not supported with segment replication.
913914
*/
914915
public static final String SETTING_INGESTION_SOURCE_ALL_ACTIVE_INGESTION = "index.ingestion_source.all_active";
915916
public static final Setting<Boolean> INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING = Setting.boolSetting(
@@ -923,21 +924,33 @@ public void validate(final Boolean value) {}
923924
@Override
924925
public void validate(final Boolean value, final Map<Setting<?>, Object> settings) {
925926
final Object replicationType = settings.get(INDEX_REPLICATION_TYPE_SETTING);
926-
if (ReplicationType.SEGMENT.equals(replicationType) && value) {
927+
final Object ingestionSourceType = settings.get(INGESTION_SOURCE_TYPE_SETTING);
928+
boolean isPullBasedIngestionEnabled = NONE_INGESTION_SOURCE_TYPE.equals(ingestionSourceType) == false;
929+
930+
if (isPullBasedIngestionEnabled && ReplicationType.SEGMENT.equals(replicationType) && value) {
927931
throw new IllegalArgumentException(
928-
"To enable "
929-
+ INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING.getKey()
930-
+ ", "
931-
+ INDEX_REPLICATION_TYPE_SETTING.getKey()
932-
+ " should not be set to "
932+
"Replication type "
933933
+ ReplicationType.SEGMENT
934+
+ " is not supported in pull-based ingestion when "
935+
+ INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING.getKey()
936+
+ " is enabled"
937+
);
938+
}
939+
940+
if (isPullBasedIngestionEnabled && ReplicationType.DOCUMENT.equals(replicationType) && value == false) {
941+
throw new IllegalArgumentException(
942+
"Replication type "
943+
+ ReplicationType.DOCUMENT
944+
+ " is not supported in pull-based ingestion when "
945+
+ INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING.getKey()
946+
+ " is not enabled"
934947
);
935948
}
936949
}
937950

938951
@Override
939952
public Iterator<Setting<?>> settings() {
940-
final List<Setting<?>> settings = List.of(INDEX_REPLICATION_TYPE_SETTING);
953+
final List<Setting<?>> settings = List.of(INDEX_REPLICATION_TYPE_SETTING, INGESTION_SOURCE_TYPE_SETTING);
941954
return settings.iterator();
942955
}
943956
},
@@ -981,6 +994,7 @@ public Iterator<Setting<?>> settings() {
981994
public static final String TRANSLOG_METADATA_KEY = "translog_metadata";
982995
public static final String CONTEXT_KEY = "context";
983996
public static final String INGESTION_SOURCE_KEY = "ingestion_source";
997+
public static final String INGESTION_STATUS_KEY = "ingestion_status";
984998

985999
public static final String INDEX_STATE_FILE_PREFIX = "state-";
9861000

@@ -2304,6 +2318,11 @@ public static void toXContent(IndexMetadata indexMetadata, XContentBuilder build
23042318
indexMetadata.context.toXContent(builder, params);
23052319
}
23062320

2321+
if (indexMetadata.ingestionStatus != null) {
2322+
builder.field(INGESTION_STATUS_KEY);
2323+
indexMetadata.ingestionStatus.toXContent(builder, params);
2324+
}
2325+
23072326
builder.endObject();
23082327
}
23092328

@@ -2387,6 +2406,8 @@ public static IndexMetadata fromXContent(XContentParser parser) throws IOExcepti
23872406
parser.skipChildren();
23882407
} else if (CONTEXT_KEY.equals(currentFieldName)) {
23892408
builder.context(Context.fromXContent(parser));
2409+
} else if (INGESTION_STATUS_KEY.equals(currentFieldName)) {
2410+
builder.ingestionStatus(IngestionStatus.fromXContent(parser));
23902411
} else {
23912412
// assume it's custom index metadata
23922413
builder.putCustom(currentFieldName, parser.mapStrings());

server/src/main/java/org/opensearch/cluster/metadata/IngestionStatus.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,18 @@
1212
import org.opensearch.core.common.io.stream.StreamInput;
1313
import org.opensearch.core.common.io.stream.StreamOutput;
1414
import org.opensearch.core.common.io.stream.Writeable;
15+
import org.opensearch.core.xcontent.ToXContent;
16+
import org.opensearch.core.xcontent.XContentBuilder;
17+
import org.opensearch.core.xcontent.XContentParser;
1518

1619
import java.io.IOException;
1720

1821
/**
1922
* Indicates pull-based ingestion status.
2023
*/
2124
@ExperimentalApi
22-
public record IngestionStatus(boolean isPaused) implements Writeable {
25+
public record IngestionStatus(boolean isPaused) implements Writeable, ToXContent {
26+
public static final String IS_PAUSED = "is_paused";
2327

2428
public IngestionStatus(StreamInput in) throws IOException {
2529
this(in.readBoolean());
@@ -30,6 +34,37 @@ public void writeTo(StreamOutput out) throws IOException {
3034
out.writeBoolean(isPaused);
3135
}
3236

37+
@Override
38+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
39+
builder.startObject();
40+
builder.field(IS_PAUSED, isPaused);
41+
builder.endObject();
42+
return builder;
43+
}
44+
45+
public static IngestionStatus fromXContent(XContentParser parser) throws IOException {
46+
boolean isPaused = false;
47+
48+
XContentParser.Token token = parser.currentToken();
49+
if (token == null) {
50+
token = parser.nextToken();
51+
}
52+
53+
if (token == XContentParser.Token.START_OBJECT) {
54+
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
55+
if (token == XContentParser.Token.FIELD_NAME) {
56+
String fieldName = parser.currentName();
57+
if (IS_PAUSED.equals(fieldName)) {
58+
parser.nextToken();
59+
isPaused = parser.booleanValue();
60+
}
61+
}
62+
}
63+
}
64+
65+
return new IngestionStatus(isPaused);
66+
}
67+
3368
public static IngestionStatus getDefaultValue() {
3469
return new IngestionStatus(false);
3570
}

server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ protected void startPoll() {
242242
// Currently we do not have a good way to skip past the failing messages.
243243
// The user will have the option to manually update the offset and resume ingestion.
244244
// todo: support retry?
245-
logger.error("Pausing ingestion. Fatal error occurred in polling the shard {}: {}", shardId, e);
245+
logger.error("Pausing ingestion. Fatal error occurred in polling the shard {} for index {}: {}", shardId, indexName, e);
246246
totalConsumerErrorCount.inc();
247247
pause();
248248
}
@@ -274,7 +274,13 @@ private IngestionShardPointer processRecords(
274274
result.getPointer().asString()
275275
);
276276
} catch (Exception e) {
277-
logger.error("Error in processing a record. Shard {}, pointer {}: {}", shardId, result.getPointer().asString(), e);
277+
logger.error(
278+
"[Default Poller] Error processing record. Index={}, Shard={}, pointer={}: error={}",
279+
indexName,
280+
shardId,
281+
result.getPointer().asString(),
282+
e
283+
);
278284
errorStrategy.handleError(e, IngestionErrorStrategy.ErrorStage.POLLING);
279285
totalPollerMessageFailureCount.inc();
280286

server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.opensearch.index.engine.IngestionEngine;
2727
import org.opensearch.index.engine.VersionConflictEngineException;
2828
import org.opensearch.index.mapper.IdFieldMapper;
29+
import org.opensearch.index.mapper.MapperParsingException;
2930
import org.opensearch.index.mapper.ParseContext;
3031
import org.opensearch.index.mapper.ParsedDocument;
3132
import org.opensearch.index.mapper.SourceToParse;
@@ -65,33 +66,50 @@ public class MessageProcessorRunnable implements Runnable, Closeable {
6566
private volatile boolean closed = false;
6667
private volatile IngestionErrorStrategy errorStrategy;
6768

69+
private final String indexName;
70+
private final int shardId;
71+
6872
/**
6973
* Constructor.
7074
*
7175
* @param blockingQueue the blocking queue to poll messages from
7276
* @param engine the ingestion engine
77+
* @param errorStrategy the error strategy/policy to use
7378
*/
7479
public MessageProcessorRunnable(
7580
BlockingQueue<ShardUpdateMessage<? extends IngestionShardPointer, ? extends Message>> blockingQueue,
7681
IngestionEngine engine,
7782
IngestionErrorStrategy errorStrategy
7883
) {
79-
this(blockingQueue, new MessageProcessor(engine), errorStrategy);
84+
this(
85+
blockingQueue,
86+
new MessageProcessor(engine),
87+
errorStrategy,
88+
engine.config().getShardId().getIndexName(),
89+
engine.config().getShardId().getId()
90+
);
8091
}
8192

8293
/**
8394
* Constructor visible for testing.
8495
* @param blockingQueue the blocking queue to poll messages from
8596
* @param messageProcessor the message processor
97+
* @param errorStrategy the error strategy/policy to use
98+
* @param indexName the index name
99+
* @param shardId the shard ID
86100
*/
87101
MessageProcessorRunnable(
88102
BlockingQueue<ShardUpdateMessage<? extends IngestionShardPointer, ? extends Message>> blockingQueue,
89103
MessageProcessor messageProcessor,
90-
IngestionErrorStrategy errorStrategy
104+
IngestionErrorStrategy errorStrategy,
105+
String indexName,
106+
int shardId
91107
) {
92108
this.blockingQueue = Objects.requireNonNull(blockingQueue);
93109
this.messageProcessor = messageProcessor;
94110
this.errorStrategy = errorStrategy;
111+
this.indexName = indexName;
112+
this.shardId = shardId;
95113
}
96114

97115
static class MessageProcessor {
@@ -309,9 +327,10 @@ public void run() {
309327
logger.debug("Dropping message due to version conflict. ShardPointer: " + shardUpdateMessage.pointer().asString(), e);
310328
shardUpdateMessage = null;
311329
} catch (Exception e) {
330+
logger.error("[Message Processor] Error processing message. Index={}, Shard={}, error={}", indexName, shardId, e);
312331
messageProcessorMetrics.failedMessageCounter.inc();
313332
errorStrategy.handleError(e, IngestionErrorStrategy.ErrorStage.PROCESSING);
314-
boolean retriesExhausted = retryCount >= MIN_RETRY_COUNT || e instanceof IllegalArgumentException;
333+
boolean retriesExhausted = hasExhaustedRetries(e, retryCount);
315334
if (retriesExhausted && errorStrategy.shouldIgnoreError(e, IngestionErrorStrategy.ErrorStage.PROCESSING)) {
316335
logDroppedMessage(shardUpdateMessage);
317336
shardUpdateMessage = null;
@@ -336,6 +355,15 @@ private void waitBeforeRetry() {
336355
}
337356
}
338357

358+
private boolean hasExhaustedRetries(Exception e, int retryCount) {
359+
if (retryCount >= MIN_RETRY_COUNT) {
360+
return true;
361+
}
362+
363+
// Don't retry validation/parsing errors
364+
return e instanceof IllegalArgumentException || e instanceof MapperParsingException;
365+
}
366+
339367
public MessageProcessorMetrics getMessageProcessorMetrics() {
340368
return messageProcessorMetrics;
341369
}

server/src/test/java/org/opensearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,10 @@ public void testToXContent() throws IOException {
146146
+ " \"0\" : [ ]\n"
147147
+ " },\n"
148148
+ " \"rollover_info\" : { },\n"
149-
+ " \"system\" : false\n"
149+
+ " \"system\" : false,\n"
150+
+ " \"ingestion_status\" : {\n"
151+
+ " \"is_paused\" : false\n"
152+
+ " }\n"
150153
+ " }\n"
151154
+ " },\n"
152155
+ " \"index-graveyard\" : {\n"
@@ -252,7 +255,10 @@ public void testToXContent() throws IOException {
252255
+ " \"0\" : [ ]\n"
253256
+ " },\n"
254257
+ " \"rollover_info\" : { },\n"
255-
+ " \"system\" : false\n"
258+
+ " \"system\" : false,\n"
259+
+ " \"ingestion_status\" : {\n"
260+
+ " \"is_paused\" : false\n"
261+
+ " }\n"
256262
+ " }\n"
257263
+ " },\n"
258264
+ " \"index-graveyard\" : {\n"

server/src/test/java/org/opensearch/cluster/ClusterStateTests.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,10 @@ public void testToXContent() throws IOException {
275275
+ " \"time\" : 1\n"
276276
+ " }\n"
277277
+ " },\n"
278-
+ " \"system\" : false\n"
278+
+ " \"system\" : false,\n"
279+
+ " \"ingestion_status\" : {\n"
280+
+ " \"is_paused\" : false\n"
281+
+ " }\n"
279282
+ " }\n"
280283
+ " },\n"
281284
+ " \"index-graveyard\" : {\n"
@@ -477,7 +480,10 @@ public void testToXContent_FlatSettingTrue_ReduceMappingFalse() throws IOExcepti
477480
+ " \"time\" : 1\n"
478481
+ " }\n"
479482
+ " },\n"
480-
+ " \"system\" : false\n"
483+
+ " \"system\" : false,\n"
484+
+ " \"ingestion_status\" : {\n"
485+
+ " \"is_paused\" : false\n"
486+
+ " }\n"
481487
+ " }\n"
482488
+ " },\n"
483489
+ " \"index-graveyard\" : {\n"
@@ -686,7 +692,10 @@ public void testToXContent_FlatSettingFalse_ReduceMappingTrue() throws IOExcepti
686692
+ " \"time\" : 1\n"
687693
+ " }\n"
688694
+ " },\n"
689-
+ " \"system\" : false\n"
695+
+ " \"system\" : false,\n"
696+
+ " \"ingestion_status\" : {\n"
697+
+ " \"is_paused\" : false\n"
698+
+ " }\n"
690699
+ " }\n"
691700
+ " },\n"
692701
+ " \"index-graveyard\" : {\n"
@@ -835,7 +844,10 @@ public void testToXContentSameTypeName() throws IOException {
835844
+ " \"0\" : [ ]\n"
836845
+ " },\n"
837846
+ " \"rollover_info\" : { },\n"
838-
+ " \"system\" : false\n"
847+
+ " \"system\" : false,\n"
848+
+ " \"ingestion_status\" : {\n"
849+
+ " \"is_paused\" : false\n"
850+
+ " }\n"
839851
+ " }\n"
840852
+ " },\n"
841853
+ " \"index-graveyard\" : {\n"

0 commit comments

Comments
 (0)