Skip to content

Commit 3fab3ae

Browse files
refactor pull-based ingestion to support message mappers
Signed-off-by: Varun Bharadwaj <[email protected]>
1 parent cb9c30b commit 3fab3ae

File tree

17 files changed

+627
-51
lines changed

17 files changed

+627
-51
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1717
- Add pointer based lag metric in pull-based ingestion ([#19635](https://github.com/opensearch-project/OpenSearch/pull/19635))
1818
- Introduced internal API for retrieving metadata about requested indices from transport actions ([#18523](https://github.com/opensearch-project/OpenSearch/pull/18523))
1919
- Add cluster defaults for merge autoThrottle, maxMergeThreads, and maxMergeCount; Add segment size filter to the merged segment warmer ([#19629](https://github.com/opensearch-project/OpenSearch/pull/19629))
20+
- Support pull-based ingestion message mappers and raw payload support ([#19765](https://github.com/opensearch-project/OpenSearch/pull/19765)]
2021

2122
### Changed
2223
- Faster `terms` query creation for `keyword` field with index and docValues enabled ([#19350](https://github.com/opensearch-project/OpenSearch/pull/19350))

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -653,4 +653,122 @@ private boolean validateOffsetBasedLagForPrimaryAndReplica(long expectedLag) {
653653
&& shardTypeToStats.get("replica").getConsumerStats().pointerBasedLag() == expectedLag;
654654
return valid;
655655
}
656+
657+
public void testRawPayloadMapperIngestion() throws Exception {
658+
// Start cluster
659+
internalCluster().startClusterManagerOnlyNode();
660+
final String nodeA = internalCluster().startDataOnlyNode();
661+
662+
// Publish 2 valid messages
663+
String validMessage1 = "{\"name\":\"alice\",\"age\":30}";
664+
String validMessage2 = "{\"name\":\"bob\",\"age\":25}";
665+
produceData(validMessage1);
666+
produceData(validMessage2);
667+
668+
// Create index with raw_payload mapper
669+
createIndex(
670+
indexName,
671+
Settings.builder()
672+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
673+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
674+
.put("ingestion_source.type", "kafka")
675+
.put("ingestion_source.param.topic", topicName)
676+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
677+
.put("ingestion_source.pointer.init.reset", "earliest")
678+
.put("ingestion_source.mapper_type", "raw_payload")
679+
.put("ingestion_source.error_strategy", "drop")
680+
.put("ingestion_source.all_active", true)
681+
.build(),
682+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
683+
);
684+
685+
ensureGreen(indexName);
686+
687+
// Wait for both messages to be indexed
688+
waitForSearchableDocs(2, List.of(nodeA));
689+
690+
// Verify stats show 2 processed messages
691+
waitForState(() -> {
692+
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
693+
.getPollingIngestStats();
694+
return stats != null
695+
&& stats.getMessageProcessorStats().totalProcessedCount() == 2L
696+
&& stats.getConsumerStats().totalPolledCount() == 2L
697+
&& stats.getConsumerStats().totalPollerMessageFailureCount() == 0L
698+
&& stats.getConsumerStats().totalPollerMessageDroppedCount() == 0L
699+
&& stats.getMessageProcessorStats().totalInvalidMessageCount() == 0L;
700+
});
701+
702+
// Validate document content
703+
SearchResponse searchResponse = client().prepareSearch(indexName).get();
704+
assertEquals(2, searchResponse.getHits().getHits().length);
705+
for (int i = 0; i < searchResponse.getHits().getHits().length; i++) {
706+
Map<String, Object> source = searchResponse.getHits().getHits()[i].getSourceAsMap();
707+
assertTrue(source.containsKey("name"));
708+
assertTrue(source.containsKey("age"));
709+
}
710+
711+
// Publish invalid JSON message
712+
String invalidJsonMessage = "{ invalid json";
713+
produceData(invalidJsonMessage);
714+
715+
// Wait for consumer to encounter the error and drop it
716+
waitForState(() -> {
717+
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
718+
.getPollingIngestStats();
719+
return stats != null
720+
&& stats.getConsumerStats().totalPolledCount() == 3L
721+
&& stats.getConsumerStats().totalPollerMessageFailureCount() == 1L
722+
&& stats.getConsumerStats().totalPollerMessageDroppedCount() == 1L
723+
&& stats.getMessageProcessorStats().totalProcessedCount() == 2L;
724+
});
725+
726+
// Publish message with invalid content that will fail at processor level
727+
String invalidFieldTypeMessage = "{\"name\":123,\"age\":\"not a number\"}";
728+
produceData(invalidFieldTypeMessage);
729+
730+
// Wait for processor to encounter the error
731+
waitForState(() -> {
732+
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
733+
.getPollingIngestStats();
734+
return stats != null
735+
&& stats.getConsumerStats().totalPolledCount() == 4L
736+
&& stats.getConsumerStats().totalPollerMessageFailureCount() == 1L
737+
&& stats.getMessageProcessorStats().totalProcessedCount() == 3L
738+
&& stats.getMessageProcessorStats().totalFailedCount() == 1L
739+
&& stats.getMessageProcessorStats().totalFailuresDroppedCount() == 1L;
740+
});
741+
742+
// Pause ingestion, reset to offset 0, and resume
743+
pauseIngestion(indexName);
744+
waitForState(() -> {
745+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
746+
return ingestionState.getShardStates().length == 1
747+
&& ingestionState.getFailedShards() == 0
748+
&& ingestionState.getShardStates()[0].isPollerPaused()
749+
&& ingestionState.getShardStates()[0].getPollerState().equalsIgnoreCase("paused");
750+
});
751+
752+
// Resume with reset to offset 0 (will re-process the 2 valid messages)
753+
resumeIngestion(indexName, 0, ResumeIngestionRequest.ResetSettings.ResetMode.OFFSET, "0");
754+
waitForState(() -> {
755+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
756+
return ingestionState.getShardStates().length == 1
757+
&& ingestionState.getShardStates()[0].isPollerPaused() == false
758+
&& (ingestionState.getShardStates()[0].getPollerState().equalsIgnoreCase("polling")
759+
|| ingestionState.getShardStates()[0].getPollerState().equalsIgnoreCase("processing"));
760+
});
761+
762+
// Wait for the 3 messages to be processed by the processor after reset (1 will be dropped by the poller)
763+
waitForState(() -> {
764+
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
765+
.getPollingIngestStats();
766+
return stats != null && stats.getMessageProcessorStats().totalProcessedCount() == 3L;
767+
});
768+
769+
// Verify still only 2 documents (no duplicates must be indexed)
770+
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0);
771+
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
772+
assertThat(response.getHits().getTotalHits().value(), is(2L));
773+
}
656774
}

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import org.opensearch.index.seqno.SequenceNumbers;
7575
import org.opensearch.indices.pollingingest.IngestionErrorStrategy;
7676
import org.opensearch.indices.pollingingest.StreamPoller;
77+
import org.opensearch.indices.pollingingest.mappers.IngestionMessageMapper;
7778
import org.opensearch.indices.replication.SegmentReplicationSource;
7879
import org.opensearch.indices.replication.common.ReplicationType;
7980

@@ -923,6 +924,18 @@ public Iterator<Setting<?>> settings() {
923924
Property.Final
924925
);
925926

927+
/**
928+
* Defines how the incoming ingestion message payload is mapped to the internal message format.
929+
*/
930+
public static final String SETTING_INGESTION_SOURCE_MAPPER_TYPE = "index.ingestion_source.mapper_type";
931+
public static final Setting<IngestionMessageMapper.MapperType> INGESTION_SOURCE_MAPPER_TYPE_SETTING = new Setting<>(
932+
SETTING_INGESTION_SOURCE_MAPPER_TYPE,
933+
IngestionMessageMapper.MapperType.DEFAULT.getName(),
934+
IngestionMessageMapper.MapperType::fromString,
935+
Property.IndexScope,
936+
Property.Final
937+
);
938+
926939
/**
927940
* Defines if all-active pull-based ingestion is enabled. In this mode, replicas will directly consume from the
928941
* streaming source and process the updates. In the default document replication mode, this setting must be enabled.
@@ -1227,6 +1240,7 @@ public IngestionSource getIngestionSource() {
12271240
final int blockingQueueSize = INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING.get(settings);
12281241
final boolean allActiveIngestionEnabled = INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING.get(settings);
12291242
final TimeValue pointerBasedLagUpdateInterval = INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING.get(settings);
1243+
final IngestionMessageMapper.MapperType mapperType = INGESTION_SOURCE_MAPPER_TYPE_SETTING.get(settings);
12301244

12311245
return new IngestionSource.Builder(ingestionSourceType).setParams(ingestionSourceParams)
12321246
.setPointerInitReset(pointerInitReset)
@@ -1237,6 +1251,7 @@ public IngestionSource getIngestionSource() {
12371251
.setBlockingQueueSize(blockingQueueSize)
12381252
.setAllActiveIngestion(allActiveIngestionEnabled)
12391253
.setPointerBasedLagUpdateInterval(pointerBasedLagUpdateInterval)
1254+
.setMapperType(mapperType)
12401255
.build();
12411256
}
12421257
return null;

server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,15 @@
1313
import org.opensearch.common.unit.TimeValue;
1414
import org.opensearch.indices.pollingingest.IngestionErrorStrategy;
1515
import org.opensearch.indices.pollingingest.StreamPoller;
16+
import org.opensearch.indices.pollingingest.mappers.IngestionMessageMapper;
1617

1718
import java.util.HashMap;
1819
import java.util.Map;
1920
import java.util.Objects;
2021

2122
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING;
2223
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING;
24+
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_MAPPER_TYPE_SETTING;
2325
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_MAX_POLL_SIZE;
2426
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING;
2527
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING;
@@ -40,6 +42,7 @@ public class IngestionSource {
4042
private int blockingQueueSize;
4143
private final boolean allActiveIngestion;
4244
private final TimeValue pointerBasedLagUpdateInterval;
45+
private final IngestionMessageMapper.MapperType mapperType;
4346

4447
private IngestionSource(
4548
String type,
@@ -51,7 +54,8 @@ private IngestionSource(
5154
int numProcessorThreads,
5255
int blockingQueueSize,
5356
boolean allActiveIngestion,
54-
TimeValue pointerBasedLagUpdateInterval
57+
TimeValue pointerBasedLagUpdateInterval,
58+
IngestionMessageMapper.MapperType mapperType
5559
) {
5660
this.type = type;
5761
this.pointerInitReset = pointerInitReset;
@@ -63,6 +67,7 @@ private IngestionSource(
6367
this.blockingQueueSize = blockingQueueSize;
6468
this.allActiveIngestion = allActiveIngestion;
6569
this.pointerBasedLagUpdateInterval = pointerBasedLagUpdateInterval;
70+
this.mapperType = mapperType;
6671
}
6772

6873
public String getType() {
@@ -105,6 +110,10 @@ public TimeValue getPointerBasedLagUpdateInterval() {
105110
return pointerBasedLagUpdateInterval;
106111
}
107112

113+
public IngestionMessageMapper.MapperType getMapperType() {
114+
return mapperType;
115+
}
116+
108117
@Override
109118
public boolean equals(Object o) {
110119
if (this == o) return true;
@@ -119,7 +128,8 @@ public boolean equals(Object o) {
119128
&& Objects.equals(numProcessorThreads, ingestionSource.numProcessorThreads)
120129
&& Objects.equals(blockingQueueSize, ingestionSource.blockingQueueSize)
121130
&& Objects.equals(allActiveIngestion, ingestionSource.allActiveIngestion)
122-
&& Objects.equals(pointerBasedLagUpdateInterval, ingestionSource.pointerBasedLagUpdateInterval);
131+
&& Objects.equals(pointerBasedLagUpdateInterval, ingestionSource.pointerBasedLagUpdateInterval)
132+
&& Objects.equals(mapperType, ingestionSource.mapperType);
123133
}
124134

125135
@Override
@@ -134,7 +144,8 @@ public int hashCode() {
134144
numProcessorThreads,
135145
blockingQueueSize,
136146
allActiveIngestion,
137-
pointerBasedLagUpdateInterval
147+
pointerBasedLagUpdateInterval,
148+
mapperType
138149
);
139150
}
140151

@@ -164,6 +175,9 @@ public String toString() {
164175
+ allActiveIngestion
165176
+ ", pointerBasedLagUpdateInterval="
166177
+ pointerBasedLagUpdateInterval
178+
+ ", mapperType='"
179+
+ mapperType
180+
+ '\''
167181
+ '}';
168182
}
169183

@@ -225,6 +239,7 @@ public static class Builder {
225239
private TimeValue pointerBasedLagUpdateInterval = INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING.getDefault(
226240
Settings.EMPTY
227241
);
242+
private IngestionMessageMapper.MapperType mapperType = INGESTION_SOURCE_MAPPER_TYPE_SETTING.getDefault(Settings.EMPTY);
228243

229244
public Builder(String type) {
230245
this.type = type;
@@ -239,6 +254,7 @@ public Builder(IngestionSource ingestionSource) {
239254
this.blockingQueueSize = ingestionSource.blockingQueueSize;
240255
this.allActiveIngestion = ingestionSource.allActiveIngestion;
241256
this.pointerBasedLagUpdateInterval = ingestionSource.pointerBasedLagUpdateInterval;
257+
this.mapperType = ingestionSource.mapperType;
242258
}
243259

244260
public Builder setPointerInitReset(PointerInitReset pointerInitReset) {
@@ -291,6 +307,11 @@ public Builder setPointerBasedLagUpdateInterval(TimeValue pointerBasedLagUpdateI
291307
return this;
292308
}
293309

310+
public Builder setMapperType(IngestionMessageMapper.MapperType mapperType) {
311+
this.mapperType = mapperType;
312+
return this;
313+
}
314+
294315
public IngestionSource build() {
295316
return new IngestionSource(
296317
type,
@@ -302,7 +323,8 @@ public IngestionSource build() {
302323
numProcessorThreads,
303324
blockingQueueSize,
304325
allActiveIngestion,
305-
pointerBasedLagUpdateInterval
326+
pointerBasedLagUpdateInterval,
327+
mapperType
306328
);
307329
}
308330

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
282282
IndexMetadata.INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING,
283283
IndexMetadata.INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING,
284284
IndexMetadata.INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING,
285+
IndexMetadata.INGESTION_SOURCE_MAPPER_TYPE_SETTING,
285286

286287
// Settings for search replica
287288
IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING,

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ private void initializeStreamPoller(
146146
.numProcessorThreads(ingestionSource.getNumProcessorThreads())
147147
.blockingQueueSize(ingestionSource.getBlockingQueueSize())
148148
.pointerBasedLagUpdateInterval(ingestionSource.getPointerBasedLagUpdateInterval().millis())
149+
.mapperType(ingestionSource.getMapperType())
149150
.build();
150151
registerStreamPollerListener();
151152

0 commit comments

Comments
 (0)