Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
fb1dd6d
Changes to support upload and download of merge segments using the In…
kh3ra Jul 4, 2025
6439bf5
Unit tests
kh3ra Jul 4, 2025
02dbf7a
Fixing build issues
kh3ra Jul 4, 2025
c91301e
Fixing build issues - forbiddenAPIs/spotlessApply
kh3ra Jul 4, 2025
2265b3e
Upload merge segments in low priority, minor fixes
kh3ra Jul 8, 2025
000b8f0
Addressing review comments + rebase main
kh3ra Jul 17, 2025
cfc8c46
Test fixes + javadocs
kh3ra Jul 18, 2025
61d4f82
1. Bug fix to RemoteDirectory.DownloadRateLimiterProvider
kh3ra Jul 20, 2025
712b9fa
Bug fix to replica updates to ActiveMergesRegistry
kh3ra Jul 21, 2025
ea6d616
Addressing review comments - round 2
kh3ra Jul 24, 2025
28b5574
new tests + test fixes + minor bug fixes
kh3ra Jul 25, 2025
471c8e9
Bug fix
kh3ra Jul 25, 2025
0e72408
Fixes to RemoteStorePublishMergedSegmentActionTests
kh3ra Jul 25, 2025
1b273f3
Adding integration tests
kh3ra Jul 28, 2025
b5a362c
Tracking stats for merged segment warmer
kh3ra Jul 29, 2025
8eb8340
Revert "Tracking stats for merged segment warmer"
kh3ra Jul 29, 2025
ed2b52a
Addressing review comments for tests
kh3ra Jul 30, 2025
df85a75
Addressing review comments
kh3ra Jul 30, 2025
5f310db
Rebasing
kh3ra Jul 30, 2025
ad5406a
spotlessApply
kh3ra Jul 30, 2025
9f483ca
test fix
kh3ra Jul 30, 2025
6f792ff
Empty commit
kh3ra Jul 30, 2025
84ac6e9
Adding tests, enhancing logs
kh3ra Aug 4, 2025
a68171b
Adding MergedSegmentWarmerFactory tests + enhancing existing tests
kh3ra Aug 4, 2025
9e49316
Applying spotless
kh3ra Aug 4, 2025
0b4b8a4
Added test for Timeout case for RemoteStoreReplicationSource
kh3ra Aug 4, 2025
21d8c13
Restored RemoteSegmentStoreDirectory PublicAPI + added changelog
kh3ra Aug 4, 2025
162b3eb
Merge branch 'main' into dev/remote-merged-segment-warmer
kh3ra Aug 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Upgrade to protobufs 0.6.0 and clean up deprecated TermQueryProtoUtils code ([#18880](https://github.com/opensearch-project/OpenSearch/pull/18880))
- Prevent shard initialization failure due to streaming consumer errors ([#18877](https://github.com/opensearch-project/OpenSearch/pull/18877))
- APIs for stream transport and new stream-based search api action ([#18722](https://github.com/opensearch-project/OpenSearch/pull/18722))
- Added the core process for warming merged segments in remote-store enabled domains ([#18683](https://github.com/opensearch-project/OpenSearch/pull/18683))

### Changed
- Update Subject interface to use CheckedRunnable ([#18570](https://github.com/opensearch-project/OpenSearch/issues/18570))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.opensearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.support.replication.TransportReplicationAction;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.TieredMergePolicyProvider;
import org.opensearch.indices.replication.checkpoint.RemoteStorePublishMergedSegmentRequest;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.test.transport.StubbableTransport;
import org.opensearch.transport.TransportService;
import org.junit.Before;

import java.nio.file.Path;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreMergedSegmentWarmerIT extends SegmentReplicationBaseIT {
private Path absolutePath;

@Override
protected Settings nodeSettings(int nodeOrdinal) {
if (absolutePath == null) {
absolutePath = randomRepoPath().toAbsolutePath();
}
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings("test-remote-store-repo", absolutePath))
.build();
}

@Override
protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
featureSettings.put(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG, true);
return featureSettings.build();
}

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
}

public void testMergeSegmentWarmerRemote() throws Exception {
final String node1 = internalCluster().startDataOnlyNode();
final String node2 = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);
MockTransportService mockTransportServiceNode1 = (MockTransportService) internalCluster().getInstance(
TransportService.class,
node1
);
MockTransportService mockTransportServiceNode2 = (MockTransportService) internalCluster().getInstance(
TransportService.class,
node2
);
final CountDownLatch latch = new CountDownLatch(1);
StubbableTransport.SendRequestBehavior behavior = (connection, requestId, action, request, options) -> {
if (action.equals("indices:admin/remote_publish_merged_segment[r]")) {
assertTrue(
((TransportReplicationAction.ConcreteReplicaRequest) request)
.getRequest() instanceof RemoteStorePublishMergedSegmentRequest
);
latch.countDown();
}
connection.sendRequest(requestId, action, request, options);
};

for (int i = 0; i < 30; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource("foo" + i, "bar" + i)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
}

waitForSearchableDocs(30, node1, node2);

mockTransportServiceNode1.addSendBehavior(behavior);
mockTransportServiceNode2.addSendBehavior(behavior);

client().admin().indices().forceMerge(new ForceMergeRequest(INDEX_NAME).maxNumSegments(2));
waitForSegmentCount(INDEX_NAME, 2, logger);
assertTrue(latch.await(10, TimeUnit.SECONDS));
mockTransportServiceNode1.clearAllRules();
mockTransportServiceNode2.clearAllRules();
}

public void testConcurrentMergeSegmentWarmerRemote() throws Exception {
String node1 = internalCluster().startDataOnlyNode();
String node2 = internalCluster().startDataOnlyNode();
createIndex(
INDEX_NAME,
Settings.builder()
.put(indexSettings())
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), 5)
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), 5)
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), false)
.build()
);
ensureGreen(INDEX_NAME);
MockTransportService mockTransportServiceNode1 = (MockTransportService) internalCluster().getInstance(
TransportService.class,
node1
);
MockTransportService mockTransportServiceNode2 = (MockTransportService) internalCluster().getInstance(
TransportService.class,
node2
);
CountDownLatch latch = new CountDownLatch(2);
AtomicLong numInvocations = new AtomicLong(0);
Set<String> executingThreads = ConcurrentHashMap.newKeySet();
StubbableTransport.SendRequestBehavior behavior = (connection, requestId, action, request, options) -> {
if (action.equals("indices:admin/remote_publish_merged_segment[r]")) {
assertTrue(
((TransportReplicationAction.ConcreteReplicaRequest) request)
.getRequest() instanceof RemoteStorePublishMergedSegmentRequest
);
latch.countDown();
numInvocations.incrementAndGet();
executingThreads.add(Thread.currentThread().getName());
}
connection.sendRequest(requestId, action, request, options);
};

mockTransportServiceNode1.addSendBehavior(behavior);
mockTransportServiceNode2.addSendBehavior(behavior);

for (int i = 0; i < 30; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource("foo" + i, "bar" + i)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
}

client().admin().indices().forceMerge(new ForceMergeRequest(INDEX_NAME).maxNumSegments(2));

waitForSegmentCount(INDEX_NAME, 2, logger);
logger.info("Number of merge invocations: {}", numInvocations.get());
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertTrue(executingThreads.size() > 1);
// Verify concurrent execution by checking that multiple unique threads handled merge operations
assertTrue(numInvocations.get() > 1);
mockTransportServiceNode1.clearAllRules();
mockTransportServiceNode2.clearAllRules();
}

public void testMergeSegmentWarmerWithInactiveReplicaRemote() throws Exception {
internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);

for (int i = 0; i < 30; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource("foo" + i, "bar" + i)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
}

client().admin().indices().forceMerge(new ForceMergeRequest(INDEX_NAME).maxNumSegments(1)).get();
final IndicesSegmentResponse response = client().admin().indices().prepareSegments(INDEX_NAME).get();
assertEquals(1, response.getIndices().get(INDEX_NAME).getShards().values().size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@

package org.opensearch.indices.replication;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.SegmentInfos;
import org.opensearch.action.admin.indices.segments.IndexShardSegments;
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.opensearch.action.admin.indices.segments.ShardSegments;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand All @@ -22,11 +26,13 @@
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.set.Sets;
import org.opensearch.core.index.Index;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.Segment;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
Expand Down Expand Up @@ -244,4 +250,26 @@ protected SegmentInfos getLatestSegmentInfos(IndexShard shard) throws IOExceptio
return closeable.get();
}
}

public static void waitForSegmentCount(String indexName, int segmentCount, Logger logger) throws Exception {
assertBusy(() -> {
Set<String> primarySegments = Sets.newHashSet();
Set<String> replicaSegments = Sets.newHashSet();
final IndicesSegmentResponse response = client().admin().indices().prepareSegments(indexName).get();
for (IndexShardSegments indexShardSegments : response.getIndices().get(indexName).getShards().values()) {
for (ShardSegments shardSegment : indexShardSegments.getShards()) {
for (Segment segment : shardSegment.getSegments()) {
if (shardSegment.getShardRouting().primary()) {
primarySegments.add(segment.getName());
} else {
replicaSegments.add(segment.getName());
}
}
}
}
logger.info("primary segments: {}, replica segments: {}", primarySegments, replicaSegments);
assertEquals(segmentCount, primarySegments.size());
assertEquals(segmentCount, replicaSegments.size());
}, 1, TimeUnit.MINUTES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1613,8 +1613,8 @@ public String toString() {
*
* @opensearch.internal
*/
protected static final class ConcreteReplicaRequest<R extends TransportRequest> extends ConcreteShardRequest<R> {

public static final class ConcreteReplicaRequest<R extends TransportRequest> extends ConcreteShardRequest<R> {
// public for tests
private final long globalCheckpoint;
private final long maxSeqNoOfUpdatesOrDeletes;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,35 @@

package org.opensearch.index.engine;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentReader;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.logging.Loggers;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.transport.TransportService;

import java.io.IOException;

/**
* Implementation of a {@link IndexWriter.IndexReaderWarmer} when local on-disk segment replication is enabled.
* Implementation of a {@link IndexWriter.IndexReaderWarmer} for merged segment replication in
* local on-disk and remote store enabled domains.
*
* @opensearch.internal
*/
public class LocalMergedSegmentWarmer implements IndexWriter.IndexReaderWarmer {
public class MergedSegmentWarmer implements IndexWriter.IndexReaderWarmer {
private final TransportService transportService;
private final RecoverySettings recoverySettings;
private final ClusterService clusterService;
private final IndexShard indexShard;

public LocalMergedSegmentWarmer(
private final Logger logger;

public MergedSegmentWarmer(
TransportService transportService,
RecoverySettings recoverySettings,
ClusterService clusterService,
Expand All @@ -40,14 +46,30 @@ public LocalMergedSegmentWarmer(
this.recoverySettings = recoverySettings;
this.clusterService = clusterService;
this.indexShard = indexShard;
this.logger = Loggers.getLogger(getClass(), indexShard.shardId());
}

@Override
public void warm(LeafReader leafReader) throws IOException {
// IndexWriter.IndexReaderWarmer#warm is called by IndexWriter#mergeMiddle. The type of leafReader should be SegmentReader.
assert leafReader instanceof SegmentReader;
assert indexShard.indexSettings().isSegRepLocalEnabled() || indexShard.indexSettings().isRemoteStoreEnabled();

long startTime = System.currentTimeMillis();
SegmentCommitInfo segmentCommitInfo = ((SegmentReader) leafReader).getSegmentInfo();
logger.trace(() -> new ParameterizedMessage("Warming segment: {}", segmentCommitInfo));
indexShard.publishMergedSegment(segmentCommitInfo);
logger.trace(() -> {
long segmentSize = -1;
try {
segmentSize = segmentCommitInfo.sizeInBytes();
} catch (IOException ignored) {}
return new ParameterizedMessage(
"Completed segment warming for {}. Size: {}B, Timing: {}ms",
segmentCommitInfo.info.name,
segmentSize,
(System.currentTimeMillis() - startTime)
);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@ public MergedSegmentWarmerFactory(TransportService transportService, RecoverySet
}

public IndexWriter.IndexReaderWarmer get(IndexShard shard) {
if (shard.indexSettings().isAssignedOnRemoteNode()) {
return new RemoteStoreMergedSegmentWarmer(transportService, recoverySettings, clusterService);
} else if (shard.indexSettings().isSegRepLocalEnabled()) {
return new LocalMergedSegmentWarmer(transportService, recoverySettings, clusterService, shard);
if (shard.indexSettings().isSegRepLocalEnabled() || shard.indexSettings().isRemoteStoreEnabled()) {
return new MergedSegmentWarmer(transportService, recoverySettings, clusterService, shard);
} else if (shard.indexSettings().isDocumentReplication()) {
// MergedSegmentWarmerFactory#get is called when IndexShard is initialized. In scenario document replication,
// IndexWriter.IndexReaderWarmer should be null.
Expand Down

This file was deleted.

Loading
Loading