Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Support AutoExpand for SearchReplica ([#17741](https://github.com/opensearch-project/OpenSearch/pull/17741))
- Implement fixed interval refresh task scheduling ([#17777](https://github.com/opensearch-project/OpenSearch/pull/17777))
- Add GRPC DocumentService and Bulk endpoint ([#17727](https://github.com/opensearch-project/OpenSearch/pull/17727))
- Support multi-threaded writes, updates and deletes in pull-based ingestion ([#17771](https://github.com/opensearch-project/OpenSearch/pull/17771))

### Changed
- Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.RangeQueryBuilder;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.indices.pollingingest.PollingIngestStats;
import org.opensearch.plugins.PluginInfo;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -73,8 +75,8 @@ public void testKafkaIngestion() {
}

public void testKafkaIngestion_RewindByTimeStamp() {
produceData("1", "name1", "24", 1739459500000L);
produceData("2", "name2", "20", 1739459800000L);
produceData("1", "name1", "24", 1739459500000L, "index");
produceData("2", "name2", "20", 1739459800000L, "index");

// create an index with ingestion source from kafka
createIndex(
Expand Down Expand Up @@ -135,4 +137,55 @@ public void testCloseIndex() throws Exception {
ensureGreen(indexName);
client().admin().indices().close(Requests.closeIndexRequest(indexName)).get();
}

public void testKafkaIngestionWithMultipleProcessorThreads() throws Exception {
for (int i = 1; i <= 2000; i++) {
produceData(String.valueOf(i), "name" + i, "25");
}
createIndexWithDefaultSettings(indexName, 1, 0, 3);
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(21);
waitForState(() -> {
refresh(indexName);
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
assertThat(response.getHits().getTotalHits().value(), is(2000L));
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
.getPollingIngestStats();
assertNotNull(stats);
assertThat(stats.getMessageProcessorStats().getTotalProcessedCount(), is(2000L));
assertThat(stats.getConsumerStats().getTotalPolledCount(), is(2000L));
return true;
});
}

public void testUpdateAndDelete() throws Exception {
// Step 1: Produce message and wait for it to be searchable

produceData("1", "name", "25", defaultMessageTimestamp, "index");
createIndexWithDefaultSettings(indexName, 1, 0, 3);
ensureGreen(indexName);
waitForState(() -> {
BoolQueryBuilder query = new BoolQueryBuilder().must(new TermQueryBuilder("_id", "1"));
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
assertThat(response.getHits().getTotalHits().value(), is(1L));
return 25 == (Integer) response.getHits().getHits()[0].getSourceAsMap().get("age");
});

// Step 2: Update age field from 25 to 30 and validate

produceData("1", "name", "30", defaultMessageTimestamp, "index");
waitForState(() -> {
BoolQueryBuilder query = new BoolQueryBuilder().must(new TermQueryBuilder("_id", "1"));
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
assertThat(response.getHits().getTotalHits().value(), is(1L));
return 30 == (Integer) response.getHits().getHits()[0].getSourceAsMap().get("age");
});

// Step 3: Delete the document and validate
produceData("1", "name", "30", defaultMessageTimestamp, "delete");
waitForState(() -> {
BoolQueryBuilder query = new BoolQueryBuilder().must(new TermQueryBuilder("_id", "1"));
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
return response.getHits().getTotalHits().value() == 0;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,15 @@ private void stopKafka() {
}

protected void produceData(String id, String name, String age) {
produceData(id, name, age, defaultMessageTimestamp);
produceData(id, name, age, defaultMessageTimestamp, "index");
}

protected void produceData(String id, String name, String age, long timestamp) {
protected void produceData(String id, String name, String age, long timestamp, String opType) {
String payload = String.format(
Locale.ROOT,
"{\"_id\":\"%s\", \"_op_type:\":\"index\",\"_source\":{\"name\":\"%s\", \"age\": %s}}",
"{\"_id\":\"%s\", \"_op_type\":\"%s\",\"_source\":{\"name\":\"%s\", \"age\": %s}}",
id,
opType,
name,
age
);
Expand Down Expand Up @@ -159,10 +160,10 @@ protected ResumeIngestionResponse resumeIngestion(String indexName) throws Execu
}

protected void createIndexWithDefaultSettings(int numShards, int numReplicas) {
createIndexWithDefaultSettings(indexName, numShards, numReplicas);
createIndexWithDefaultSettings(indexName, numShards, numReplicas, 1);
}

protected void createIndexWithDefaultSettings(String indexName, int numShards, int numReplicas) {
protected void createIndexWithDefaultSettings(String indexName, int numShards, int numReplicas, int numProcessorThreads) {
createIndex(
indexName,
Settings.builder()
Expand All @@ -173,6 +174,7 @@ protected void createIndexWithDefaultSettings(String indexName, int numShards, i
.put("ingestion_source.param.topic", topicName)
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.put("index.replication.type", "SEGMENT")
.put("ingestion_source.num_processor_threads", numProcessorThreads)
// set custom kafka consumer properties
.put("ingestion_source.param.fetch.min.bytes", 30000)
.put("ingestion_source.param.enable.auto.commit", false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,8 @@ public void testPaginatedGetIngestionState() throws ExecutionException, Interrup
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNode();
createIndexWithDefaultSettings("index1", 5, 0);
createIndexWithDefaultSettings("index2", 5, 0);
createIndexWithDefaultSettings("index1", 5, 0, 1);
createIndexWithDefaultSettings("index2", 5, 0, 1);
ensureGreen("index1");
ensureGreen("index2");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,18 @@ public Iterator<Setting<?>> settings() {
Property.Dynamic
);

/**
* Defines the number of processor threads that will write to the lucene index.
*/
public static final String SETTING_INGESTION_SOURCE_NUM_PROCESSOR_THREADS = "index.ingestion_source.num_processor_threads";
public static final Setting<Integer> INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING = Setting.intSetting(
SETTING_INGESTION_SOURCE_NUM_PROCESSOR_THREADS,
1,
1,
Setting.Property.IndexScope,
Setting.Property.Final
);

public static final Setting.AffixSetting<Object> INGESTION_SOURCE_PARAMS_SETTING = Setting.prefixKeySetting(
"index.ingestion_source.param.",
key -> new Setting<>(key, "", (value) -> {
Expand Down Expand Up @@ -1025,8 +1037,9 @@ public IngestionSource getIngestionSource() {
);

final IngestionErrorStrategy.ErrorStrategy errorStrategy = INGESTION_SOURCE_ERROR_STRATEGY_SETTING.get(settings);
final int numProcessorThreads = INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING.get(settings);
final Map<String, Object> ingestionSourceParams = INGESTION_SOURCE_PARAMS_SETTING.getAsMap(settings);
return new IngestionSource(ingestionSourceType, pointerInitReset, errorStrategy, ingestionSourceParams);
return new IngestionSource(ingestionSourceType, pointerInitReset, errorStrategy, numProcessorThreads, ingestionSourceParams);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,21 @@ public class IngestionSource {
private String type;
private PointerInitReset pointerInitReset;
private IngestionErrorStrategy.ErrorStrategy errorStrategy;
private int numMessageProcessorThreads;
private Map<String, Object> params;

public IngestionSource(
String type,
PointerInitReset pointerInitReset,
IngestionErrorStrategy.ErrorStrategy errorStrategy,
int numMessageProcessorThreads,
Map<String, Object> params
) {
this.type = type;
this.pointerInitReset = pointerInitReset;
this.params = params;
this.errorStrategy = errorStrategy;
this.numMessageProcessorThreads = numMessageProcessorThreads;
}

public String getType() {
Expand All @@ -53,6 +56,10 @@ public Map<String, Object> params() {
return params;
}

public int getNumMessageProcessorThreads() {
return numMessageProcessorThreads;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -61,12 +68,13 @@ public boolean equals(Object o) {
return Objects.equals(type, ingestionSource.type)
&& Objects.equals(pointerInitReset, ingestionSource.pointerInitReset)
&& Objects.equals(errorStrategy, ingestionSource.errorStrategy)
&& Objects.equals(params, ingestionSource.params);
&& Objects.equals(params, ingestionSource.params)
&& Objects.equals(numMessageProcessorThreads, ingestionSource.numMessageProcessorThreads);
}

@Override
public int hashCode() {
return Objects.hash(type, pointerInitReset, params, errorStrategy);
return Objects.hash(type, pointerInitReset, params, errorStrategy, numMessageProcessorThreads);
}

@Override
Expand All @@ -81,6 +89,9 @@ public String toString() {
+ ",error_strategy='"
+ errorStrategy
+ '\''
+ ",numMessageProcessorThreads='"
+ numMessageProcessorThreads
+ '\''
+ ", params="
+ params
+ '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexMetadata.INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING,
IndexMetadata.INGESTION_SOURCE_PARAMS_SETTING,
IndexMetadata.INGESTION_SOURCE_ERROR_STRATEGY_SETTING,
IndexMetadata.INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING,

// validate that built-in similarities don't get redefined
Setting.groupSetting("index.similarity.", (s) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.admin.indices.streamingingestion.state.ShardIngestionState;
Expand All @@ -22,6 +23,8 @@
import org.opensearch.index.mapper.DocumentMapperForType;
import org.opensearch.index.mapper.IdFieldMapper;
import org.opensearch.index.mapper.ParseContext;
import org.opensearch.index.mapper.ParsedDocument;
import org.opensearch.index.mapper.SeqNoFieldMapper;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.translog.NoOpTranslogManager;
import org.opensearch.index.translog.Translog;
Expand All @@ -43,6 +46,7 @@
import java.util.Set;
import java.util.function.BiFunction;

import static org.opensearch.action.index.IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
import static org.opensearch.index.translog.Translog.EMPTY_TRANSLOG_SNAPSHOT;

/**
Expand Down Expand Up @@ -117,7 +121,8 @@ public void start() {
resetState,
resetValue,
ingestionErrorStrategy,
initialPollerState
initialPollerState,
ingestionSource.getNumMessageProcessorThreads()
);
streamPoller.start();
}
Expand Down Expand Up @@ -153,8 +158,13 @@ public IndexResult index(Index index) throws IOException {
}

private IndexResult indexIntoLucene(Index index) throws IOException {
// todo: handle updates
addDocs(index.docs(), indexWriter);
if (index.getAutoGeneratedIdTimestamp() != UNSET_AUTO_GENERATED_TIMESTAMP) {
assert index.getAutoGeneratedIdTimestamp() >= 0 : "autoGeneratedIdTimestamp must be positive but was: "
+ index.getAutoGeneratedIdTimestamp();
addDocs(index.docs(), indexWriter);
} else {
updateDocs(index.uid(), index.docs(), indexWriter);
}
return new IndexResult(index.version(), index.primaryTerm(), index.seqNo(), true);
}

Expand All @@ -166,9 +176,28 @@ private void addDocs(final List<ParseContext.Document> docs, final IndexWriter i
}
}

private void updateDocs(final Term uid, final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
if (docs.size() > 1) {
indexWriter.softUpdateDocuments(uid, docs, softDeletesField);
} else {
indexWriter.softUpdateDocument(uid, docs.get(0), softDeletesField);
}
}

@Override
public DeleteResult delete(Delete delete) throws IOException {
return null;
assert Objects.equals(delete.uid().field(), IdFieldMapper.NAME) : delete.uid().field();
ensureOpen();
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newDeleteTombstoneDoc(delete.id());
assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]";
final ParseContext.Document doc = tombstone.docs().get(0);
assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null : "Delete tombstone document but _tombstone field is not set ["
+ doc
+ " ]";
doc.add(softDeletesField);
indexWriter.softUpdateDocument(delete.uid(), doc, softDeletesField);
// delete result is unused in ingestion flow
return new DeleteResult(1, delete.primaryTerm(), -1, true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ public class InternalEngine extends Engine {
protected final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
protected final SoftDeletesPolicy softDeletesPolicy;
protected final AtomicBoolean shouldPeriodicallyFlushAfterBigMerge = new AtomicBoolean(false);
protected final NumericDocValuesField softDeletesField = Lucene.newSoftDeletesField();

@Nullable
protected final String historyUUID;
Expand Down Expand Up @@ -197,7 +198,6 @@ public class InternalEngine extends Engine {
private final CounterMetric numDocDeletes = new CounterMetric();
private final CounterMetric numDocAppends = new CounterMetric();
private final CounterMetric numDocUpdates = new CounterMetric();
private final NumericDocValuesField softDeletesField = Lucene.newSoftDeletesField();
private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener;

private final CompletionStatsCache completionStatsCache;
Expand Down
Loading