Skip to content

Commit eeabe3a

Browse files
authored
[Backport 2.17] [RW Separation] Add polling segment replication for search replicas (#15627) (#15718)
* [RW Separation] Add polling segment replication for search replicas (#15627) (cherry picked from commit 375c0bf) Signed-off-by: Marc Handalian <[email protected]> (cherry picked from commit d3b3a93) Signed-off-by: Marc Handalian <[email protected]> * fix compilation from extra ctors on 2.x line Signed-off-by: Marc Handalian <[email protected]> --------- Signed-off-by: Marc Handalian <[email protected]>
1 parent 4589765 commit eeabe3a

File tree

12 files changed

+482
-17
lines changed

12 files changed

+482
-17
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3939
- Adding translog durability validation in index templates ([#15494](https://github.com/opensearch-project/OpenSearch/pull/15494))
4040
- [Range Queries] Add new approximateable query framework to short-circuit range queries ([#13788](https://github.com/opensearch-project/OpenSearch/pull/13788))
4141
- [Workload Management] Add query group level failure tracking ([#15227](https://github.com/opensearch-project/OpenSearch/pull/15527))
42-
- [Reader Writer Separation] Add searchOnly replica routing configuration ([#15410](https://github.com/opensearch-project/OpenSearch/pull/15410))
42+
- [Reader Writer Separation] Add experimental search replica shard type to achieve reader writer separation ([#15237](https://github.com/opensearch-project/OpenSearch/pull/15237))
4343
- Add index creation using the context field ([#15290](https://github.com/opensearch-project/OpenSearch/pull/15290))
4444
- [Remote Publication] Add remote download stats ([#15291](https://github.com/opensearch-project/OpenSearch/pull/15291))
4545
- Add support to upload snapshot shard blobs with hashed prefix ([#15426](https://github.com/opensearch-project/OpenSearch/pull/15426))
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.indices.replication;
10+
11+
import org.opensearch.cluster.metadata.IndexMetadata;
12+
import org.opensearch.common.settings.Settings;
13+
import org.opensearch.common.util.FeatureFlags;
14+
import org.opensearch.test.OpenSearchIntegTestCase;
15+
import org.junit.After;
16+
import org.junit.Before;
17+
18+
import java.nio.file.Path;
19+
20+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
21+
public class SearchReplicaReplicationIT extends SegmentReplicationBaseIT {
22+
23+
private static final String REPOSITORY_NAME = "test-remote-store-repo";
24+
protected Path absolutePath;
25+
26+
private Boolean useRemoteStore;
27+
28+
@Before
29+
public void randomizeRemoteStoreEnabled() {
30+
useRemoteStore = randomBoolean();
31+
}
32+
33+
@Override
34+
protected Settings nodeSettings(int nodeOrdinal) {
35+
if (useRemoteStore) {
36+
if (absolutePath == null) {
37+
absolutePath = randomRepoPath().toAbsolutePath();
38+
}
39+
return Settings.builder()
40+
.put(super.nodeSettings(nodeOrdinal))
41+
.put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath))
42+
.build();
43+
}
44+
return super.nodeSettings(nodeOrdinal);
45+
}
46+
47+
@After
48+
public void teardown() {
49+
if (useRemoteStore) {
50+
clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get();
51+
}
52+
}
53+
54+
@Override
55+
public Settings indexSettings() {
56+
return Settings.builder()
57+
.put(super.indexSettings())
58+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
59+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
60+
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
61+
.build();
62+
}
63+
64+
@Override
65+
protected Settings featureFlagSettings() {
66+
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, true).build();
67+
}
68+
69+
public void testReplication() throws Exception {
70+
internalCluster().startClusterManagerOnlyNode();
71+
final String primary = internalCluster().startDataOnlyNode();
72+
createIndex(INDEX_NAME);
73+
ensureYellowAndNoInitializingShards(INDEX_NAME);
74+
final String replica = internalCluster().startDataOnlyNode();
75+
ensureGreen(INDEX_NAME);
76+
77+
final int docCount = 10;
78+
for (int i = 0; i < docCount; i++) {
79+
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
80+
}
81+
refresh(INDEX_NAME);
82+
waitForSearchableDocs(docCount, primary, replica);
83+
}
84+
85+
}

server/src/main/java/org/opensearch/index/IndexModule.java

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import org.opensearch.index.engine.EngineFactory;
7474
import org.opensearch.index.mapper.MapperService;
7575
import org.opensearch.index.shard.IndexEventListener;
76+
import org.opensearch.index.shard.IndexShard;
7677
import org.opensearch.index.shard.IndexingOperationListener;
7778
import org.opensearch.index.shard.SearchOperationListener;
7879
import org.opensearch.index.similarity.SimilarityService;
@@ -729,6 +730,56 @@ public IndexService newIndexService(
729730
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
730731
RecoverySettings recoverySettings,
731732
RemoteStoreSettings remoteStoreSettings
733+
) throws IOException {
734+
return newIndexService(
735+
indexCreationContext,
736+
environment,
737+
xContentRegistry,
738+
shardStoreDeleter,
739+
circuitBreakerService,
740+
bigArrays,
741+
threadPool,
742+
scriptService,
743+
clusterService,
744+
client,
745+
indicesQueryCache,
746+
mapperRegistry,
747+
indicesFieldDataCache,
748+
namedWriteableRegistry,
749+
idFieldDataEnabled,
750+
valuesSourceRegistry,
751+
remoteDirectoryFactory,
752+
translogFactorySupplier,
753+
clusterDefaultRefreshIntervalSupplier,
754+
recoverySettings,
755+
remoteStoreSettings,
756+
(s) -> {}
757+
);
758+
}
759+
760+
public IndexService newIndexService(
761+
IndexService.IndexCreationContext indexCreationContext,
762+
NodeEnvironment environment,
763+
NamedXContentRegistry xContentRegistry,
764+
IndexService.ShardStoreDeleter shardStoreDeleter,
765+
CircuitBreakerService circuitBreakerService,
766+
BigArrays bigArrays,
767+
ThreadPool threadPool,
768+
ScriptService scriptService,
769+
ClusterService clusterService,
770+
Client client,
771+
IndicesQueryCache indicesQueryCache,
772+
MapperRegistry mapperRegistry,
773+
IndicesFieldDataCache indicesFieldDataCache,
774+
NamedWriteableRegistry namedWriteableRegistry,
775+
BooleanSupplier idFieldDataEnabled,
776+
ValuesSourceRegistry valuesSourceRegistry,
777+
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
778+
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
779+
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
780+
RecoverySettings recoverySettings,
781+
RemoteStoreSettings remoteStoreSettings,
782+
Consumer<IndexShard> replicator
732783
) throws IOException {
733784
final IndexEventListener eventListener = freeze();
734785
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
@@ -789,7 +840,8 @@ public IndexService newIndexService(
789840
recoverySettings,
790841
remoteStoreSettings,
791842
fileCache,
792-
compositeIndexSettings
843+
compositeIndexSettings,
844+
replicator
793845
);
794846
success = true;
795847
return indexService;

server/src/main/java/org/opensearch/index/IndexService.java

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@
136136
import static java.util.Collections.emptyMap;
137137
import static java.util.Collections.unmodifiableMap;
138138
import static org.opensearch.common.collect.MapBuilder.newMapBuilder;
139+
import static org.opensearch.common.util.FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING;
139140
import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdater.indexHasRemoteStoreSettings;
140141

141142
/**
@@ -174,6 +175,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
174175
private volatile AsyncTranslogFSync fsyncTask;
175176
private volatile AsyncGlobalCheckpointTask globalCheckpointTask;
176177
private volatile AsyncRetentionLeaseSyncTask retentionLeaseSyncTask;
178+
private volatile AsyncReplicationTask asyncReplicationTask;
177179

178180
// don't convert to Setting<> and register... we only set this in tests and register via a plugin
179181
private final String INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING = "index.translog.retention.check_interval";
@@ -194,6 +196,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
194196
private final RemoteStoreSettings remoteStoreSettings;
195197
private final FileCache fileCache;
196198
private final CompositeIndexSettings compositeIndexSettings;
199+
private final Consumer<IndexShard> replicator;
197200

198201
public IndexService(
199202
IndexSettings indexSettings,
@@ -231,7 +234,8 @@ public IndexService(
231234
RecoverySettings recoverySettings,
232235
RemoteStoreSettings remoteStoreSettings,
233236
FileCache fileCache,
234-
CompositeIndexSettings compositeIndexSettings
237+
CompositeIndexSettings compositeIndexSettings,
238+
Consumer<IndexShard> replicator
235239
) {
236240
super(indexSettings);
237241
this.allowExpensiveQueries = allowExpensiveQueries;
@@ -306,11 +310,15 @@ public IndexService(
306310
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
307311
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
308312
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
313+
if (READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(indexSettings.getNodeSettings())) {
314+
this.asyncReplicationTask = new AsyncReplicationTask(this);
315+
}
309316
this.translogFactorySupplier = translogFactorySupplier;
310317
this.recoverySettings = recoverySettings;
311318
this.remoteStoreSettings = remoteStoreSettings;
312319
this.compositeIndexSettings = compositeIndexSettings;
313320
this.fileCache = fileCache;
321+
this.replicator = replicator;
314322
updateFsyncTaskIfNecessary();
315323
}
316324

@@ -387,7 +395,8 @@ public IndexService(
387395
recoverySettings,
388396
remoteStoreSettings,
389397
fileCache,
390-
null
398+
null,
399+
(s) -> {}
391400
);
392401
}
393402

@@ -463,7 +472,8 @@ public IndexService(
463472
recoverySettings,
464473
remoteStoreSettings,
465474
null,
466-
null
475+
null,
476+
s -> {}
467477
);
468478
}
469479

@@ -472,6 +482,11 @@ static boolean needsMapperService(IndexSettings indexSettings, IndexCreationCont
472482
&& indexCreationContext == IndexCreationContext.CREATE_INDEX); // metadata verification needs a mapper service
473483
}
474484

485+
// visible for tests
486+
AsyncReplicationTask getReplicationTask() {
487+
return asyncReplicationTask;
488+
}
489+
475490
/**
476491
* Context for index creation
477492
*
@@ -1142,11 +1157,22 @@ public synchronized void updateMetadata(final IndexMetadata currentIndexMetadata
11421157
}
11431158
onRefreshIntervalChange();
11441159
updateFsyncTaskIfNecessary();
1160+
if (READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(indexSettings.getNodeSettings())) {
1161+
updateReplicationTask();
1162+
}
11451163
}
11461164

11471165
metadataListeners.forEach(c -> c.accept(newIndexMetadata));
11481166
}
11491167

1168+
private void updateReplicationTask() {
1169+
try {
1170+
asyncReplicationTask.close();
1171+
} finally {
1172+
asyncReplicationTask = new AsyncReplicationTask(this);
1173+
}
1174+
}
1175+
11501176
/**
11511177
* Called whenever the refresh interval changes. This can happen in 2 cases -
11521178
* 1. {@code cluster.default.index.refresh_interval} cluster setting changes. The change would only happen for
@@ -1411,6 +1437,47 @@ public String toString() {
14111437
}
14121438
}
14131439

1440+
final class AsyncReplicationTask extends BaseAsyncTask {
1441+
1442+
AsyncReplicationTask(IndexService indexService) {
1443+
super(indexService, indexService.getRefreshInterval());
1444+
}
1445+
1446+
@Override
1447+
protected void runInternal() {
1448+
indexService.maybeSyncSegments(false);
1449+
}
1450+
1451+
@Override
1452+
protected String getThreadPool() {
1453+
return ThreadPool.Names.GENERIC;
1454+
}
1455+
1456+
@Override
1457+
public String toString() {
1458+
return "replication";
1459+
}
1460+
1461+
@Override
1462+
protected boolean mustReschedule() {
1463+
return indexSettings.isSegRepEnabledOrRemoteNode() && super.mustReschedule();
1464+
}
1465+
}
1466+
1467+
private void maybeSyncSegments(boolean force) {
1468+
if (getRefreshInterval().millis() > 0 || force) {
1469+
for (IndexShard shard : this.shards.values()) {
1470+
try {
1471+
if (shard.routingEntry().isSearchOnly() && shard.routingEntry().active()) {
1472+
replicator.accept(shard);
1473+
}
1474+
} catch (IndexShardClosedException | AlreadyClosedException ex) {
1475+
// do nothing
1476+
}
1477+
}
1478+
}
1479+
}
1480+
14141481
final class AsyncTrimTranslogTask extends BaseAsyncTask {
14151482

14161483
AsyncTrimTranslogTask(IndexService indexService) {

server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1253,12 +1253,13 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() {
12531253
return this.latestReplicationCheckpoint;
12541254
}
12551255

1256-
private boolean isPrimaryRelocation(String allocationId) {
1256+
// skip any shard that is a relocating primary or search only replica (not tracked by primary)
1257+
private boolean shouldSkipReplicationTimer(String allocationId) {
12571258
Optional<ShardRouting> shardRouting = routingTable.shards()
12581259
.stream()
12591260
.filter(routing -> routing.allocationId().getId().equals(allocationId))
12601261
.findAny();
1261-
return shardRouting.isPresent() && shardRouting.get().primary();
1262+
return shardRouting.isPresent() && (shardRouting.get().primary() || shardRouting.get().isSearchOnly());
12621263
}
12631264

12641265
private void createReplicationLagTimers() {
@@ -1270,7 +1271,7 @@ private void createReplicationLagTimers() {
12701271
// it is possible for a shard to be in-sync but not yet removed from the checkpoints collection after a failover event.
12711272
if (cps.inSync
12721273
&& replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false
1273-
&& isPrimaryRelocation(allocationId) == false
1274+
&& shouldSkipReplicationTimer(allocationId) == false
12741275
&& latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint)
12751276
&& (indexSettings.isSegRepLocalEnabled() == true
12761277
|| isShardOnRemoteEnabledNode.apply(routingTable.getByAllocationId(allocationId).currentNodeId()))) {
@@ -1304,7 +1305,7 @@ public synchronized void startReplicationLagTimers(ReplicationCheckpoint checkpo
13041305
final CheckpointState cps = e.getValue();
13051306
if (cps.inSync
13061307
&& replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false
1307-
&& isPrimaryRelocation(e.getKey()) == false
1308+
&& shouldSkipReplicationTimer(e.getKey()) == false
13081309
&& latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint)
13091310
&& cps.checkpointTimers.containsKey(latestReplicationCheckpoint)) {
13101311
cps.checkpointTimers.get(latestReplicationCheckpoint).start();
@@ -1332,7 +1333,7 @@ public synchronized Set<SegmentReplicationShardStats> getSegmentReplicationStats
13321333
entry -> entry.getKey().equals(this.shardAllocationId) == false
13331334
&& entry.getValue().inSync
13341335
&& replicationGroup.getUnavailableInSyncShards().contains(entry.getKey()) == false
1335-
&& isPrimaryRelocation(entry.getKey()) == false
1336+
&& shouldSkipReplicationTimer(entry.getKey()) == false
13361337
/*Check if the current primary shard is migrating to remote and
13371338
all the other shard copies of the same index still hasn't completely moved over
13381339
to the remote enabled nodes. Ensures that:

0 commit comments

Comments
 (0)