Skip to content

Commit 3782d1d

Browse files
Support multiple processor threads in pull-based ingestion
Signed-off-by: Varun Bharadwaj <[email protected]>
1 parent 137683e commit 3782d1d

File tree

15 files changed

+496
-95
lines changed

15 files changed

+496
-95
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1616
- [Security Manager Replacement] Create initial Java Agent to intercept Socket::connect calls ([#17724](https://github.com/opensearch-project/OpenSearch/pull/17724))
1717
- Add ingestion management APIs for pause, resume and get ingestion state ([#17631](https://github.com/opensearch-project/OpenSearch/pull/17631))
1818
- [Security Manager Replacement] Enhance Java Agent to intercept System::exit ([#17746](https://github.com/opensearch-project/OpenSearch/pull/17746))
19+
- Support multi-threaded writes in pull-based ingestion ([#17771](https://github.com/opensearch-project/OpenSearch/pull/17771))
1920

2021
### Changed
2122
- Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912))

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,4 +135,37 @@ public void testCloseIndex() throws Exception {
135135
ensureGreen(indexName);
136136
client().admin().indices().close(Requests.closeIndexRequest(indexName)).get();
137137
}
138+
139+
public void testKafkaIngestionWithMultipleProcessorThreads() {
140+
for (int i = 1; i <= 100; i++) {
141+
produceData(String.valueOf(i), "name" + i, "25");
142+
}
143+
144+
createIndex(
145+
indexName,
146+
Settings.builder()
147+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
148+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
149+
.put("ingestion_source.type", "kafka")
150+
.put("ingestion_source.pointer.init.reset", "earliest")
151+
.put("ingestion_source.param.topic", topicName)
152+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
153+
.put("index.replication.type", "SEGMENT")
154+
.put("ingestion_source.num_processor_threads", 3)
155+
.build(),
156+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
157+
);
158+
159+
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(21);
160+
await().atMost(60, TimeUnit.SECONDS).untilAsserted(() -> {
161+
refresh(indexName);
162+
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
163+
assertThat(response.getHits().getTotalHits().value(), is(100L));
164+
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
165+
.getPollingIngestStats();
166+
assertNotNull(stats);
167+
assertThat(stats.getMessageProcessorStats().getTotalProcessedCount(), is(100L));
168+
assertThat(stats.getConsumerStats().getTotalPolledCount(), is(100L));
169+
});
170+
}
138171
}

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -784,6 +784,18 @@ public Iterator<Setting<?>> settings() {
784784
Property.Dynamic
785785
);
786786

787+
/**
788+
* Defines the number of processor threads that will write to the lucene index.
789+
*/
790+
public static final String SETTING_INGESTION_SOURCE_NUM_PROCESSOR_THREADS = "index.ingestion_source.num_processor_threads";
791+
public static final Setting<Integer> INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING = Setting.intSetting(
792+
SETTING_INGESTION_SOURCE_NUM_PROCESSOR_THREADS,
793+
1,
794+
1,
795+
Setting.Property.IndexScope,
796+
Setting.Property.Final
797+
);
798+
787799
public static final Setting.AffixSetting<Object> INGESTION_SOURCE_PARAMS_SETTING = Setting.prefixKeySetting(
788800
"index.ingestion_source.param.",
789801
key -> new Setting<>(key, "", (value) -> {
@@ -1023,8 +1035,9 @@ public IngestionSource getIngestionSource() {
10231035
);
10241036

10251037
final IngestionErrorStrategy.ErrorStrategy errorStrategy = INGESTION_SOURCE_ERROR_STRATEGY_SETTING.get(settings);
1038+
final int numProcessorThreads = INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING.get(settings);
10261039
final Map<String, Object> ingestionSourceParams = INGESTION_SOURCE_PARAMS_SETTING.getAsMap(settings);
1027-
return new IngestionSource(ingestionSourceType, pointerInitReset, errorStrategy, ingestionSourceParams);
1040+
return new IngestionSource(ingestionSourceType, pointerInitReset, errorStrategy, numProcessorThreads, ingestionSourceParams);
10281041
}
10291042
return null;
10301043
}

server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,21 @@ public class IngestionSource {
2323
private String type;
2424
private PointerInitReset pointerInitReset;
2525
private IngestionErrorStrategy.ErrorStrategy errorStrategy;
26+
private int numMessageProcessorThreads;
2627
private Map<String, Object> params;
2728

2829
public IngestionSource(
2930
String type,
3031
PointerInitReset pointerInitReset,
3132
IngestionErrorStrategy.ErrorStrategy errorStrategy,
33+
int numMessageProcessorThreads,
3234
Map<String, Object> params
3335
) {
3436
this.type = type;
3537
this.pointerInitReset = pointerInitReset;
3638
this.params = params;
3739
this.errorStrategy = errorStrategy;
40+
this.numMessageProcessorThreads = numMessageProcessorThreads;
3841
}
3942

4043
public String getType() {
@@ -53,6 +56,10 @@ public Map<String, Object> params() {
5356
return params;
5457
}
5558

59+
public int getNumMessageProcessorThreads() {
60+
return numMessageProcessorThreads;
61+
}
62+
5663
@Override
5764
public boolean equals(Object o) {
5865
if (this == o) return true;
@@ -61,12 +68,13 @@ public boolean equals(Object o) {
6168
return Objects.equals(type, ingestionSource.type)
6269
&& Objects.equals(pointerInitReset, ingestionSource.pointerInitReset)
6370
&& Objects.equals(errorStrategy, ingestionSource.errorStrategy)
64-
&& Objects.equals(params, ingestionSource.params);
71+
&& Objects.equals(params, ingestionSource.params)
72+
&& Objects.equals(numMessageProcessorThreads, ingestionSource.numMessageProcessorThreads);
6573
}
6674

6775
@Override
6876
public int hashCode() {
69-
return Objects.hash(type, pointerInitReset, params, errorStrategy);
77+
return Objects.hash(type, pointerInitReset, params, errorStrategy, numMessageProcessorThreads);
7078
}
7179

7280
@Override
@@ -81,6 +89,9 @@ public String toString() {
8189
+ ",error_strategy='"
8290
+ errorStrategy
8391
+ '\''
92+
+ ",numMessageProcessorThreads='"
93+
+ numMessageProcessorThreads
94+
+ '\''
8495
+ ", params="
8596
+ params
8697
+ '}';

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
267267
IndexMetadata.INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING,
268268
IndexMetadata.INGESTION_SOURCE_PARAMS_SETTING,
269269
IndexMetadata.INGESTION_SOURCE_ERROR_STRATEGY_SETTING,
270+
IndexMetadata.INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING,
270271

271272
// validate that built-in similarities don't get redefined
272273
Setting.groupSetting("index.similarity.", (s) -> {

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ public void start() {
117117
resetState,
118118
resetValue,
119119
ingestionErrorStrategy,
120-
initialPollerState
120+
initialPollerState,
121+
ingestionSource.getNumMessageProcessorThreads()
121122
);
122123
streamPoller.start();
123124
}

server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java

Lines changed: 13 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
import java.util.Locale;
2222
import java.util.Objects;
2323
import java.util.Set;
24-
import java.util.concurrent.ArrayBlockingQueue;
25-
import java.util.concurrent.BlockingQueue;
2624
import java.util.concurrent.ExecutorService;
2725
import java.util.concurrent.Executors;
2826

@@ -48,8 +46,6 @@ public class DefaultStreamPoller implements StreamPoller {
4846

4947
private ExecutorService consumerThread;
5048

51-
private ExecutorService processorThread;
52-
5349
// start of the batch, inclusive
5450
private IngestionShardPointer batchStartPointer;
5551
private boolean includeBatchStartPointer = false;
@@ -59,16 +55,14 @@ public class DefaultStreamPoller implements StreamPoller {
5955

6056
private Set<IngestionShardPointer> persistedPointers;
6157

62-
private BlockingQueue<IngestionShardConsumer.ReadResult<? extends IngestionShardPointer, ? extends Message>> blockingQueue;
63-
64-
private MessageProcessorRunnable processorRunnable;
65-
6658
private final CounterMetric totalPolledCount = new CounterMetric();
6759

6860
// A pointer to the max persisted pointer for optimizing the check
6961
@Nullable
7062
private IngestionShardPointer maxPersistedPointer;
7163

64+
private PartitionedBlockingQueueContainer blockingQueueContainer;
65+
7266
public DefaultStreamPoller(
7367
IngestionShardPointer startPointer,
7468
Set<IngestionShardPointer> persistedPointers,
@@ -77,13 +71,14 @@ public DefaultStreamPoller(
7771
ResetState resetState,
7872
String resetValue,
7973
IngestionErrorStrategy errorStrategy,
80-
State initialState
74+
State initialState,
75+
int numProcessorThreads
8176
) {
8277
this(
8378
startPointer,
8479
persistedPointers,
8580
consumer,
86-
new MessageProcessorRunnable(new ArrayBlockingQueue<>(100), ingestionEngine, errorStrategy),
81+
new PartitionedBlockingQueueContainer(numProcessorThreads, consumer.getShardId(), ingestionEngine, errorStrategy),
8782
resetState,
8883
resetValue,
8984
errorStrategy,
@@ -95,7 +90,7 @@ public DefaultStreamPoller(
9590
IngestionShardPointer startPointer,
9691
Set<IngestionShardPointer> persistedPointers,
9792
IngestionShardConsumer consumer,
98-
MessageProcessorRunnable processorRunnable,
93+
PartitionedBlockingQueueContainer blockingQueueContainer,
9994
ResetState resetState,
10095
String resetValue,
10196
IngestionErrorStrategy errorStrategy,
@@ -110,22 +105,13 @@ public DefaultStreamPoller(
110105
if (!this.persistedPointers.isEmpty()) {
111106
maxPersistedPointer = this.persistedPointers.stream().max(IngestionShardPointer::compareTo).get();
112107
}
113-
this.processorRunnable = processorRunnable;
114-
blockingQueue = processorRunnable.getBlockingQueue();
108+
this.blockingQueueContainer = blockingQueueContainer;
115109
this.consumerThread = Executors.newSingleThreadExecutor(
116110
r -> new Thread(
117111
r,
118112
String.format(Locale.ROOT, "stream-poller-consumer-%d-%d", consumer.getShardId(), System.currentTimeMillis())
119113
)
120114
);
121-
122-
// TODO: allow multiple threads for processing the messages in parallel
123-
this.processorThread = Executors.newSingleThreadExecutor(
124-
r -> new Thread(
125-
r,
126-
String.format(Locale.ROOT, "stream-poller-processor-%d-%d", consumer.getShardId(), System.currentTimeMillis())
127-
)
128-
);
129115
this.errorStrategy = errorStrategy;
130116
}
131117

@@ -143,7 +129,7 @@ public void start() {
143129
// when we start, we need to include the batch start pointer in the read for the first read
144130
includeBatchStartPointer = true;
145131
consumerThread.submit(this::startPoll);
146-
processorThread.submit(processorRunnable);
132+
blockingQueueContainer.startProcessorThreads();
147133
}
148134

149135
/**
@@ -234,7 +220,7 @@ protected void startPoll() {
234220
continue;
235221
}
236222
totalPolledCount.inc();
237-
blockingQueue.put(result);
223+
blockingQueueContainer.add(result);
238224

239225
logger.debug(
240226
"Put message {} with pointer {} to the blocking queue",
@@ -311,10 +297,10 @@ public void close() {
311297
logger.error("Error in closing the poller of shard {}: {}", consumer.getShardId(), e);
312298
}
313299
}
314-
blockingQueue.clear();
300+
315301
consumerThread.shutdown();
316302
// interrupts the processor
317-
processorThread.shutdownNow();
303+
blockingQueueContainer.close();
318304
logger.info("closed the poller of shard {}", consumer.getShardId());
319305
}
320306

@@ -337,7 +323,7 @@ public IngestionShardPointer getBatchStartPointer() {
337323
public PollingIngestStats getStats() {
338324
PollingIngestStats.Builder builder = new PollingIngestStats.Builder();
339325
builder.setTotalPolledCount(totalPolledCount.count());
340-
builder.setTotalProcessedCount(processorRunnable.getStats().count());
326+
builder.setTotalProcessedCount(blockingQueueContainer.getTotalProcessedCount());
341327
return builder.build();
342328
}
343329

@@ -353,6 +339,6 @@ public IngestionErrorStrategy getErrorStrategy() {
353339
@Override
354340
public void updateErrorStrategy(IngestionErrorStrategy errorStrategy) {
355341
this.errorStrategy = errorStrategy;
356-
processorRunnable.setErrorStrategy(errorStrategy);
342+
blockingQueueContainer.updateErrorStrategy(errorStrategy);
357343
}
358344
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.indices.pollingingest;
10+
11+
import org.opensearch.common.UUIDs;
12+
import org.opensearch.common.xcontent.XContentHelper;
13+
import org.opensearch.core.common.bytes.BytesArray;
14+
import org.opensearch.core.common.bytes.BytesReference;
15+
import org.opensearch.core.xcontent.MediaTypeRegistry;
16+
17+
import java.util.Map;
18+
19+
/**
20+
* Holds common utilities for streaming ingestion.
21+
*/
22+
public class IngestionUtils {
23+
24+
public static Map<String, Object> getParsedPayloadMap(byte[] payload) {
25+
BytesReference payloadBR = new BytesArray(payload);
26+
Map<String, Object> payloadMap = XContentHelper.convertToMap(payloadBR, false, MediaTypeRegistry.xContentType(payloadBR)).v2();
27+
return payloadMap;
28+
}
29+
30+
public static String generateID() {
31+
return UUIDs.base64UUID();
32+
}
33+
}

0 commit comments

Comments
 (0)