Skip to content

Commit ecb65cf

Browse files
shourya035kaushalmahi12
authored andcommitted
[Remote Store Migration] Reconcile remote store based index settings during STRICT mode switch (opensearch-project#14792)
Signed-off-by: Shourya Dutta Biswas <[email protected]> Signed-off-by: Kaushal Kumar <[email protected]>
1 parent 2caebaf commit ecb65cf

File tree

6 files changed

+375
-116
lines changed

6 files changed

+375
-116
lines changed

server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.concurrent.atomic.AtomicBoolean;
4747
import java.util.concurrent.atomic.AtomicLong;
4848

49+
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
4950
import static org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING;
5051
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
5152
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
@@ -277,4 +278,30 @@ protected IndexShard getIndexShard(String dataNode, String indexName) throws Exe
277278
IndexService indexService = indicesService.indexService(new Index(indexName, uuid));
278279
return indexService.getShard(0);
279280
}
281+
282+
public void changeReplicaCountAndEnsureGreen(int replicaCount, String indexName) {
283+
assertAcked(
284+
client().admin()
285+
.indices()
286+
.prepareUpdateSettings(indexName)
287+
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, replicaCount))
288+
);
289+
ensureGreen(indexName);
290+
}
291+
292+
public void completeDocRepToRemoteMigration() {
293+
assertTrue(
294+
internalCluster().client()
295+
.admin()
296+
.cluster()
297+
.prepareUpdateSettings()
298+
.setPersistentSettings(
299+
Settings.builder()
300+
.putNull(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey())
301+
.putNull(MIGRATION_DIRECTION_SETTING.getKey())
302+
)
303+
.get()
304+
.isAcknowledged()
305+
);
306+
}
280307
}

server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteMigrationIndexMetadataUpdateIT.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -546,6 +546,73 @@ public void testRemoteIndexPathFileExistsAfterMigration() throws Exception {
546546
assertTrue(Arrays.stream(files).anyMatch(file -> file.toString().contains(fileNamePrefix)));
547547
}
548548

549+
/**
550+
* Scenario:
551+
* Creates an index with 1 pri 1 rep setup with 3 docrep nodes (1 cluster manager + 2 data nodes),
552+
* initiate migration and create 3 remote nodes (1 cluster manager + 2 data nodes) and moves over
553+
* only primary shard copy of the index
554+
* After the primary shard copy is relocated, decrease replica count to 0, stop all docrep nodes
555+
* and conclude migration. Remote store index settings should be applied to the index at this point.
556+
*/
557+
public void testIndexSettingsUpdateDuringReplicaCountDecrement() throws Exception {
558+
String indexName = "migration-index-replica-decrement";
559+
String docrepClusterManager = internalCluster().startClusterManagerOnlyNode();
560+
561+
logger.info("---> Starting 2 docrep nodes");
562+
List<String> docrepNodeNames = internalCluster().startDataOnlyNodes(2);
563+
internalCluster().validateClusterFormed();
564+
565+
logger.info("---> Creating index with 1 primary and 1 replica");
566+
Settings oneReplica = Settings.builder()
567+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
568+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
569+
.build();
570+
createIndexAndAssertDocrepProperties(indexName, oneReplica);
571+
572+
int docsToIndex = randomIntBetween(10, 100);
573+
logger.info("---> Indexing {} on both indices", docsToIndex);
574+
indexBulk(indexName, docsToIndex);
575+
576+
logger.info(
577+
"---> Stopping shard rebalancing to ensure shards do not automatically move over to newer nodes after they are launched"
578+
);
579+
stopShardRebalancing();
580+
581+
logger.info("---> Starting 3 remote store enabled nodes");
582+
initDocRepToRemoteMigration();
583+
setAddRemote(true);
584+
internalCluster().startClusterManagerOnlyNode();
585+
List<String> remoteNodeNames = internalCluster().startDataOnlyNodes(2);
586+
internalCluster().validateClusterFormed();
587+
588+
String primaryNode = primaryNodeName(indexName);
589+
590+
logger.info("---> Moving over primary to remote store enabled nodes");
591+
assertAcked(
592+
client().admin()
593+
.cluster()
594+
.prepareReroute()
595+
.add(new MoveAllocationCommand(indexName, 0, primaryNode, remoteNodeNames.get(0)))
596+
.execute()
597+
.actionGet()
598+
);
599+
waitForRelocation();
600+
waitNoPendingTasksOnAll();
601+
602+
logger.info("---> Reducing replica count to 0 for the index");
603+
changeReplicaCountAndEnsureGreen(0, indexName);
604+
605+
logger.info("---> Stopping all docrep nodes");
606+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(docrepClusterManager));
607+
for (String node : docrepNodeNames) {
608+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node));
609+
}
610+
internalCluster().validateClusterFormed();
611+
completeDocRepToRemoteMigration();
612+
waitNoPendingTasksOnAll();
613+
assertRemoteProperties(indexName);
614+
}
615+
549616
private void createIndexAndAssertDocrepProperties(String index, Settings settings) {
550617
createIndexAssertHealthAndDocrepProperties(index, settings, this::ensureGreen);
551618
}

server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java

Lines changed: 19 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import org.opensearch.cluster.ClusterState;
4343
import org.opensearch.cluster.block.ClusterBlockException;
4444
import org.opensearch.cluster.block.ClusterBlockLevel;
45-
import org.opensearch.cluster.metadata.IndexMetadata;
4645
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
4746
import org.opensearch.cluster.metadata.Metadata;
4847
import org.opensearch.cluster.node.DiscoveryNode;
@@ -59,17 +58,13 @@
5958
import org.opensearch.common.settings.SettingsException;
6059
import org.opensearch.core.action.ActionListener;
6160
import org.opensearch.core.common.io.stream.StreamInput;
62-
import org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdater;
6361
import org.opensearch.node.remotestore.RemoteStoreNodeService;
6462
import org.opensearch.threadpool.ThreadPool;
6563
import org.opensearch.transport.TransportService;
6664

6765
import java.io.IOException;
68-
import java.util.Collection;
69-
import java.util.Set;
70-
import java.util.stream.Collectors;
7166

72-
import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdater.indexHasAllRemoteStoreRelatedMetadata;
67+
import static org.opensearch.index.remote.RemoteStoreUtils.checkAndFinalizeRemoteStoreMigration;
7368

7469
/**
7570
* Transport action for updating cluster settings
@@ -262,13 +257,14 @@ public void onFailure(String source, Exception e) {
262257

263258
@Override
264259
public ClusterState execute(final ClusterState currentState) {
265-
validateCompatibilityModeSettingRequest(request, state);
266-
final ClusterState clusterState = updater.updateSettings(
260+
boolean isCompatibilityModeChanging = validateCompatibilityModeSettingRequest(request, state);
261+
ClusterState clusterState = updater.updateSettings(
267262
currentState,
268263
clusterSettings.upgradeSettings(request.transientSettings()),
269264
clusterSettings.upgradeSettings(request.persistentSettings()),
270265
logger
271266
);
267+
clusterState = checkAndFinalizeRemoteStoreMigration(isCompatibilityModeChanging, request, clusterState, logger);
272268
changed = clusterState != currentState;
273269
return clusterState;
274270
}
@@ -278,19 +274,23 @@ public ClusterState execute(final ClusterState currentState) {
278274

279275
/**
280276
* Runs various checks associated with changing cluster compatibility mode
277+
*
281278
* @param request cluster settings update request, for settings to be updated and new values
282279
* @param clusterState current state of cluster, for information on nodes
280+
* @return true if the incoming cluster settings update request is switching compatibility modes
283281
*/
284-
public void validateCompatibilityModeSettingRequest(ClusterUpdateSettingsRequest request, ClusterState clusterState) {
282+
public boolean validateCompatibilityModeSettingRequest(ClusterUpdateSettingsRequest request, ClusterState clusterState) {
285283
Settings settings = Settings.builder().put(request.persistentSettings()).put(request.transientSettings()).build();
286284
if (RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.exists(settings)) {
287-
String value = RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(settings).mode;
288285
validateAllNodesOfSameVersion(clusterState.nodes());
289-
if (RemoteStoreNodeService.CompatibilityMode.STRICT.mode.equals(value)) {
286+
if (RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(
287+
settings
288+
) == RemoteStoreNodeService.CompatibilityMode.STRICT) {
290289
validateAllNodesOfSameType(clusterState.nodes());
291-
validateIndexSettings(clusterState);
292290
}
291+
return true;
293292
}
293+
return false;
294294
}
295295

296296
/**
@@ -310,31 +310,18 @@ private void validateAllNodesOfSameVersion(DiscoveryNodes discoveryNodes) {
310310
* @param discoveryNodes current discovery nodes in the cluster
311311
*/
312312
private void validateAllNodesOfSameType(DiscoveryNodes discoveryNodes) {
313-
Set<Boolean> nodeTypes = discoveryNodes.getNodes()
313+
boolean allNodesDocrepEnabled = discoveryNodes.getNodes()
314314
.values()
315315
.stream()
316-
.map(DiscoveryNode::isRemoteStoreNode)
317-
.collect(Collectors.toSet());
318-
if (nodeTypes.size() != 1) {
316+
.allMatch(discoveryNode -> discoveryNode.isRemoteStoreNode() == false);
317+
boolean allNodesRemoteStoreEnabled = discoveryNodes.getNodes()
318+
.values()
319+
.stream()
320+
.allMatch(discoveryNode -> discoveryNode.isRemoteStoreNode());
321+
if (allNodesDocrepEnabled == false && allNodesRemoteStoreEnabled == false) {
319322
throw new SettingsException(
320323
"can not switch to STRICT compatibility mode when the cluster contains both remote and non-remote nodes"
321324
);
322325
}
323326
}
324-
325-
/**
326-
* Verifies that while trying to switch to STRICT compatibility mode,
327-
* all indices in the cluster have {@link RemoteMigrationIndexMetadataUpdater#indexHasAllRemoteStoreRelatedMetadata(IndexMetadata)} as <code>true</code>.
328-
* If not, throws {@link SettingsException}
329-
* @param clusterState current cluster state
330-
*/
331-
private void validateIndexSettings(ClusterState clusterState) {
332-
Collection<IndexMetadata> allIndicesMetadata = clusterState.metadata().indices().values();
333-
if (allIndicesMetadata.isEmpty() == false
334-
&& allIndicesMetadata.stream().anyMatch(indexMetadata -> indexHasAllRemoteStoreRelatedMetadata(indexMetadata) == false)) {
335-
throw new SettingsException(
336-
"can not switch to STRICT compatibility mode since all indices in the cluster does not have remote store based index settings"
337-
);
338-
}
339-
}
340327
}

server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,25 +11,33 @@
1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
1313
import org.opensearch.Version;
14+
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
1415
import org.opensearch.cluster.ClusterState;
1516
import org.opensearch.cluster.metadata.IndexMetadata;
17+
import org.opensearch.cluster.metadata.Metadata;
1618
import org.opensearch.cluster.node.DiscoveryNode;
1719
import org.opensearch.cluster.node.DiscoveryNodes;
20+
import org.opensearch.cluster.routing.RoutingTable;
1821
import org.opensearch.common.collect.Tuple;
1922
import org.opensearch.common.settings.Settings;
2023
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
24+
import org.opensearch.node.remotestore.RemoteStoreNodeService;
2125

2226
import java.nio.ByteBuffer;
2327
import java.util.Arrays;
2428
import java.util.Base64;
29+
import java.util.Collection;
30+
import java.util.Collections;
2531
import java.util.HashMap;
2632
import java.util.List;
2733
import java.util.Locale;
2834
import java.util.Map;
2935
import java.util.Objects;
3036
import java.util.Optional;
3137
import java.util.function.Function;
38+
import java.util.stream.Collectors;
3239

40+
import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdater.indexHasRemoteStoreSettings;
3341
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING;
3442
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING;
3543
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA;
@@ -250,4 +258,119 @@ public static Map<String, String> getRemoteStoreRepoName(DiscoveryNodes discover
250258
.findFirst();
251259
return remoteNode.map(RemoteStoreNodeAttribute::getDataRepoNames).orElseGet(HashMap::new);
252260
}
261+
262+
/**
263+
* Invoked after a cluster settings update.
264+
* Checks if the applied cluster settings has switched the cluster to STRICT mode.
265+
* If so, checks and applies appropriate index settings depending on the current set
266+
* of node types in the cluster
267+
* This has been intentionally done after the cluster settings update
268+
* flow. That way we are not interfering with the usual settings update
269+
* and the cluster state mutation that comes along with it
270+
*
271+
* @param isCompatibilityModeChanging flag passed from cluster settings update call to denote if a compatibility mode change has been done
272+
* @param request request payload passed from cluster settings update
273+
* @param currentState cluster state generated after changing cluster settings were applied
274+
* @param logger Logger reference
275+
* @return Mutated cluster state with remote store index settings applied, no-op if the cluster is not switching to `STRICT` compatibility mode
276+
*/
277+
public static ClusterState checkAndFinalizeRemoteStoreMigration(
278+
boolean isCompatibilityModeChanging,
279+
ClusterUpdateSettingsRequest request,
280+
ClusterState currentState,
281+
Logger logger
282+
) {
283+
if (isCompatibilityModeChanging && isSwitchToStrictCompatibilityMode(request)) {
284+
return finalizeMigration(currentState, logger);
285+
}
286+
return currentState;
287+
}
288+
289+
/**
290+
* Finalizes the docrep to remote-store migration process by applying remote store based index settings
291+
* on indices that are missing them. No-Op if all indices already have the settings applied through
292+
* IndexMetadataUpdater
293+
*
294+
* @param incomingState mutated cluster state after cluster settings were applied
295+
* @return new cluster state with index settings updated
296+
*/
297+
public static ClusterState finalizeMigration(ClusterState incomingState, Logger logger) {
298+
Map<String, DiscoveryNode> discoveryNodeMap = incomingState.nodes().getNodes();
299+
if (discoveryNodeMap.isEmpty() == false) {
300+
// At this point, we have already validated that all nodes in the cluster are of uniform type.
301+
// Either all of them are remote store enabled, or all of them are docrep enabled
302+
boolean remoteStoreEnabledNodePresent = discoveryNodeMap.values().stream().findFirst().get().isRemoteStoreNode();
303+
if (remoteStoreEnabledNodePresent == true) {
304+
List<IndexMetadata> indicesWithoutRemoteStoreSettings = getIndicesWithoutRemoteStoreSettings(incomingState, logger);
305+
if (indicesWithoutRemoteStoreSettings.isEmpty() == true) {
306+
logger.info("All indices in the cluster has remote store based index settings");
307+
} else {
308+
Metadata mutatedMetadata = applyRemoteStoreSettings(incomingState, indicesWithoutRemoteStoreSettings, logger);
309+
return ClusterState.builder(incomingState).metadata(mutatedMetadata).build();
310+
}
311+
} else {
312+
logger.debug("All nodes in the cluster are not remote nodes. Skipping.");
313+
}
314+
}
315+
return incomingState;
316+
}
317+
318+
/**
319+
* Filters out indices which does not have remote store based
320+
* index settings applied even after all shard copies have
321+
* migrated to remote store enabled nodes
322+
*/
323+
private static List<IndexMetadata> getIndicesWithoutRemoteStoreSettings(ClusterState clusterState, Logger logger) {
324+
Collection<IndexMetadata> allIndicesMetadata = clusterState.metadata().indices().values();
325+
if (allIndicesMetadata.isEmpty() == false) {
326+
List<IndexMetadata> indicesWithoutRemoteSettings = allIndicesMetadata.stream()
327+
.filter(idxMd -> indexHasRemoteStoreSettings(idxMd.getSettings()) == false)
328+
.collect(Collectors.toList());
329+
logger.debug(
330+
"Attempting to switch to strict mode. Count of indices without remote store settings {}",
331+
indicesWithoutRemoteSettings.size()
332+
);
333+
return indicesWithoutRemoteSettings;
334+
}
335+
return Collections.emptyList();
336+
}
337+
338+
/**
339+
* Applies remote store index settings through {@link RemoteMigrationIndexMetadataUpdater}
340+
*/
341+
private static Metadata applyRemoteStoreSettings(
342+
ClusterState clusterState,
343+
List<IndexMetadata> indicesWithoutRemoteStoreSettings,
344+
Logger logger
345+
) {
346+
Metadata.Builder metadataBuilder = Metadata.builder(clusterState.getMetadata());
347+
RoutingTable currentRoutingTable = clusterState.getRoutingTable();
348+
DiscoveryNodes currentDiscoveryNodes = clusterState.getNodes();
349+
Settings currentClusterSettings = clusterState.metadata().settings();
350+
for (IndexMetadata indexMetadata : indicesWithoutRemoteStoreSettings) {
351+
IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexMetadata);
352+
RemoteMigrationIndexMetadataUpdater indexMetadataUpdater = new RemoteMigrationIndexMetadataUpdater(
353+
currentDiscoveryNodes,
354+
currentRoutingTable,
355+
indexMetadata,
356+
currentClusterSettings,
357+
logger
358+
);
359+
indexMetadataUpdater.maybeAddRemoteIndexSettings(indexMetadataBuilder, indexMetadata.getIndex().getName());
360+
metadataBuilder.put(indexMetadataBuilder);
361+
}
362+
return metadataBuilder.build();
363+
}
364+
365+
/**
366+
* Checks if the incoming cluster settings payload is attempting to switch
367+
* the cluster to `STRICT` compatibility mode
368+
* Visible only for tests
369+
*/
370+
public static boolean isSwitchToStrictCompatibilityMode(ClusterUpdateSettingsRequest request) {
371+
Settings incomingSettings = Settings.builder().put(request.persistentSettings()).put(request.transientSettings()).build();
372+
return RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(
373+
incomingSettings
374+
) == RemoteStoreNodeService.CompatibilityMode.STRICT;
375+
}
253376
}

0 commit comments

Comments
 (0)