Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add a dynamic setting to change skip_cache_factor and min_frequency for querycache ([#18351](https://github.com/opensearch-project/OpenSearch/issues/18351))
- Add overload constructor for Translog to accept Channel Factory as a parameter ([#18918](https://github.com/opensearch-project/OpenSearch/pull/18918))
- Add subdirectory-aware store module with recovery support ([#19132](https://github.com/opensearch-project/OpenSearch/pull/19132))

- Add a dynamic cluster setting to control the enablement of the merged segment warmer ([#18929](https://github.com/opensearch-project/OpenSearch/pull/18929))
### Changed
- Add CompletionStage variants to methods in the Client Interface and default to ActionListener impl ([#18998](https://github.com/opensearch-project/OpenSearch/pull/18998))
- IllegalArgumentException when scroll ID references a node not found in Cluster ([#19031](https://github.com/opensearch-project/OpenSearch/pull/19031))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opensearch.index.engine.Segment;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.ConnectTransportException;
Expand All @@ -45,7 +46,10 @@
public class MergedSegmentWarmerIT extends SegmentReplicationIT {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).build();
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true)
.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.TieredMergePolicyProvider;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.replication.checkpoint.RemoteStorePublishMergedSegmentRequest;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
Expand All @@ -42,6 +43,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings("test-remote-store-repo", absolutePath))
.put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true)
.build();
}

Expand All @@ -62,13 +64,11 @@ public void testMergeSegmentWarmerRemote() throws Exception {
final String node2 = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);
MockTransportService mockTransportServiceNode1 = (MockTransportService) internalCluster().getInstance(
TransportService.class,
node1
);
MockTransportService mockTransportServiceNode2 = (MockTransportService) internalCluster().getInstance(

String primaryShardNode = findprimaryShardNode(INDEX_NAME);
MockTransportService mockTransportServicePrimary = (MockTransportService) internalCluster().getInstance(
TransportService.class,
node2
primaryShardNode
);
final CountDownLatch latch = new CountDownLatch(1);
StubbableTransport.SendRequestBehavior behavior = (connection, requestId, action, request, options) -> {
Expand All @@ -82,6 +82,8 @@ public void testMergeSegmentWarmerRemote() throws Exception {
connection.sendRequest(requestId, action, request, options);
};

mockTransportServicePrimary.addSendBehavior(behavior);

for (int i = 0; i < 30; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
Expand All @@ -92,14 +94,10 @@ public void testMergeSegmentWarmerRemote() throws Exception {

waitForSearchableDocs(30, node1, node2);

mockTransportServiceNode1.addSendBehavior(behavior);
mockTransportServiceNode2.addSendBehavior(behavior);

client().admin().indices().forceMerge(new ForceMergeRequest(INDEX_NAME).maxNumSegments(2));
waitForSegmentCount(INDEX_NAME, 2, logger);
assertTrue(latch.await(10, TimeUnit.SECONDS));
mockTransportServiceNode1.clearAllRules();
mockTransportServiceNode2.clearAllRules();
mockTransportServicePrimary.clearAllRules();
}

public void testConcurrentMergeSegmentWarmerRemote() throws Exception {
Expand All @@ -115,14 +113,13 @@ public void testConcurrentMergeSegmentWarmerRemote() throws Exception {
.build()
);
ensureGreen(INDEX_NAME);
MockTransportService mockTransportServiceNode1 = (MockTransportService) internalCluster().getInstance(
TransportService.class,
node1
);
MockTransportService mockTransportServiceNode2 = (MockTransportService) internalCluster().getInstance(

String primaryShardNode = findprimaryShardNode(INDEX_NAME);
MockTransportService mockTransportServicePrimary = (MockTransportService) internalCluster().getInstance(
TransportService.class,
node2
primaryShardNode
);

CountDownLatch latch = new CountDownLatch(2);
AtomicLong numInvocations = new AtomicLong(0);
Set<String> executingThreads = ConcurrentHashMap.newKeySet();
Expand All @@ -139,8 +136,7 @@ public void testConcurrentMergeSegmentWarmerRemote() throws Exception {
connection.sendRequest(requestId, action, request, options);
};

mockTransportServiceNode1.addSendBehavior(behavior);
mockTransportServiceNode2.addSendBehavior(behavior);
mockTransportServicePrimary.addSendBehavior(behavior);

for (int i = 0; i < 30; i++) {
client().prepareIndex(INDEX_NAME)
Expand All @@ -158,8 +154,7 @@ public void testConcurrentMergeSegmentWarmerRemote() throws Exception {
assertTrue(executingThreads.size() > 1);
// Verify concurrent execution by checking that multiple unique threads handled merge operations
assertTrue(numInvocations.get() > 1);
mockTransportServiceNode1.clearAllRules();
mockTransportServiceNode2.clearAllRules();
mockTransportServicePrimary.clearAllRules();
}

public void testMergeSegmentWarmerWithInactiveReplicaRemote() throws Exception {
Expand All @@ -179,4 +174,60 @@ public void testMergeSegmentWarmerWithInactiveReplicaRemote() throws Exception {
final IndicesSegmentResponse response = client().admin().indices().prepareSegments(INDEX_NAME).get();
assertEquals(1, response.getIndices().get(INDEX_NAME).getShards().values().size());
}

public void testMergeSegmentWarmerWithWarmingDisabled() throws Exception {
internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

String primaryNodeName = findprimaryShardNode(INDEX_NAME);
internalCluster().client()
.admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(
Settings.builder().put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), false).build()
)
.get();

MockTransportService mockTransportServicePrimary = (MockTransportService) internalCluster().getInstance(
TransportService.class,
primaryNodeName
);

CountDownLatch warmingLatch = new CountDownLatch(1);
StubbableTransport.SendRequestBehavior behavior = (connection, requestId, action, request, options) -> {
if (action.equals("indices:admin/remote_publish_merged_segment[r]")) {
warmingLatch.countDown(); // This should NOT happen
}
connection.sendRequest(requestId, action, request, options);
};

mockTransportServicePrimary.addSendBehavior(behavior);

for (int i = 0; i < 30; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource("foo" + i, "bar" + i)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
}

client().admin().indices().forceMerge(new ForceMergeRequest(INDEX_NAME).maxNumSegments(1)).get();
final IndicesSegmentResponse response = client().admin().indices().prepareSegments(INDEX_NAME).get();
assertEquals(1, response.getIndices().get(INDEX_NAME).getShards().values().size());
assertFalse("Warming should be skipped when disabled", warmingLatch.await(5, TimeUnit.SECONDS));
mockTransportServicePrimary.clearAllRules();
}

/**
* Returns the node name for the node hosting the primary shard for index "indexName"
*/
private String findprimaryShardNode(String indexName) {
String nodeId = internalCluster().clusterService().state().routingTable().index(indexName).shard(0).primaryShard().currentNodeId();

return internalCluster().clusterService().state().nodes().get(nodeId).getName();

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ public void apply(Settings value, Settings current, Settings previous) {
ShardLimitValidator.SETTING_CLUSTER_IGNORE_DOT_INDEXES,
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING,
RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING,
RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING,
RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_MAX_BYTES_PER_SEC_SETTING,
RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_TIMEOUT_SETTING,
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ public MergedSegmentWarmer(

@Override
public void warm(LeafReader leafReader) throws IOException {
if (shouldWarm() == false) {
return;
}
// IndexWriter.IndexReaderWarmer#warm is called by IndexWriter#mergeMiddle. The type of leafReader should be SegmentReader.
assert leafReader instanceof SegmentReader;
assert indexShard.indexSettings().isSegRepLocalEnabled() || indexShard.indexSettings().isRemoteStoreEnabled();
Expand All @@ -72,4 +75,9 @@ public void warm(LeafReader leafReader) throws IOException {
);
});
}

// package-private for tests
boolean shouldWarm() {
return indexShard.getRecoverySettings().isMergedSegmentReplicationWarmerEnabled() == true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.lucene.index.IndexWriter;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.transport.TransportService;
Expand All @@ -34,12 +35,15 @@ public MergedSegmentWarmerFactory(TransportService transportService, RecoverySet
}

public IndexWriter.IndexReaderWarmer get(IndexShard shard) {
if (shard.indexSettings().isSegRepLocalEnabled() || shard.indexSettings().isRemoteStoreEnabled()) {
return new MergedSegmentWarmer(transportService, recoverySettings, clusterService, shard);
} else if (shard.indexSettings().isDocumentReplication()) {
// MergedSegmentWarmerFactory#get is called when IndexShard is initialized. In scenario document replication,
// IndexWriter.IndexReaderWarmer should be null.
if (FeatureFlags.isEnabled(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG) == false
|| shard.indexSettings().isDocumentReplication()) {
// MergedSegmentWarmerFactory#get is called by IndexShard#newEngineConfig on the initialization of a new indexShard and
// in cases of updates to shard state.
// 1. IndexWriter.IndexReaderWarmer should be null when IndexMetadata.INDEX_REPLICATION_TYPE_SETTING == ReplicationType.DOCUMENT
// 2. IndexWriter.IndexReaderWarmer should be null when the FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG == false
return null;
} else if (shard.indexSettings().isSegRepLocalEnabled() || shard.indexSettings().isRemoteStoreEnabled()) {
return new MergedSegmentWarmer(transportService, recoverySettings, clusterService, shard);
}
// We just handle known cases and throw exception at the last. This will allow predictability on the IndexReaderWarmer behaviour.
throw new IllegalStateException(shard.shardId() + " can't determine IndexReaderWarmer");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Setting.Validator;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
Expand Down Expand Up @@ -75,6 +77,28 @@ public class RecoverySettings {
Property.NodeScope
);

/**
* Dynamic setting to enable the merged segment warming(pre-copy) feature, default: false
*/
public static final Setting<Boolean> INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING = Setting.boolSetting(
"indices.replication.merged_segment_warmer_enabled",
false,
new Validator<Boolean>() {
@Override
public void validate(Boolean value) {
if (FeatureFlags.isEnabled(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG) == false && value == true) {
throw new IllegalArgumentException(
"FeatureFlag "
+ FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG
+ " must be enabled to set this property to true."
);
}
}
},
Property.Dynamic,
Property.NodeScope
);

/**
* Individual speed setting for merged segment replication, default -1B to reuse the setting of recovery.
*/
Expand Down Expand Up @@ -211,6 +235,7 @@ public class RecoverySettings {

private volatile ByteSizeValue recoveryMaxBytesPerSec;
private volatile ByteSizeValue replicationMaxBytesPerSec;
private volatile boolean mergedSegmentReplicationWarmerEnabled;
private volatile ByteSizeValue mergedSegmentReplicationMaxBytesPerSec;
private volatile int maxConcurrentFileChunks;
private volatile int maxConcurrentOperations;
Expand Down Expand Up @@ -250,6 +275,7 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
recoveryRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac());
}
this.replicationMaxBytesPerSec = INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.get(settings);
this.mergedSegmentReplicationWarmerEnabled = INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.get(settings);
this.mergedSegmentReplicationMaxBytesPerSec = INDICES_MERGED_SEGMENT_REPLICATION_MAX_BYTES_PER_SEC_SETTING.get(settings);
this.mergedSegmentReplicationTimeout = INDICES_MERGED_SEGMENT_REPLICATION_TIMEOUT_SETTING.get(settings);
replicationRateLimiter = getReplicationRateLimiter(replicationMaxBytesPerSec);
Expand All @@ -261,6 +287,10 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {

clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setRecoveryMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, this::setReplicationMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(
INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING,
this::setIndicesMergedSegmentReplicationWarmerEnabled
);
clusterSettings.addSettingsUpdateConsumer(
INDICES_MERGED_SEGMENT_REPLICATION_MAX_BYTES_PER_SEC_SETTING,
this::setMergedSegmentReplicationMaxBytesPerSec
Expand Down Expand Up @@ -442,4 +472,12 @@ private void setMaxConcurrentRemoteStoreStreams(int maxConcurrentRemoteStoreStre
this.maxConcurrentRemoteStoreStreams = maxConcurrentRemoteStoreStreams;
}

public boolean isMergedSegmentReplicationWarmerEnabled() {
return mergedSegmentReplicationWarmerEnabled;
}

public void setIndicesMergedSegmentReplicationWarmerEnabled(boolean mergedSegmentReplicationWarmerEnabled) {
this.mergedSegmentReplicationWarmerEnabled = mergedSegmentReplicationWarmerEnabled;
}

}
Loading
Loading