Skip to content

Commit 8abdffb

Browse files
soosinhawangdongyu.danny
authored andcommitted
Use clusterUUID for discovery nodes remote path (opensearch-project#15143)
Signed-off-by: Sooraj Sinha <[email protected]>
1 parent 3d1d852 commit 8abdffb

File tree

2 files changed

+156
-1
lines changed

2 files changed

+156
-1
lines changed
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.gateway.remote;
10+
11+
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
12+
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
13+
import org.opensearch.client.Client;
14+
import org.opensearch.common.blobstore.BlobPath;
15+
import org.opensearch.common.settings.Settings;
16+
import org.opensearch.common.util.FeatureFlags;
17+
import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest;
18+
import org.opensearch.indices.recovery.RecoverySettings;
19+
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
20+
import org.opensearch.repositories.RepositoriesService;
21+
import org.opensearch.repositories.blobstore.BlobStoreRepository;
22+
import org.opensearch.repositories.fs.ReloadableFsRepository;
23+
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
24+
import org.opensearch.test.OpenSearchIntegTestCase.Scope;
25+
import org.junit.Before;
26+
27+
import java.io.IOException;
28+
import java.nio.charset.StandardCharsets;
29+
import java.util.Base64;
30+
import java.util.Locale;
31+
import java.util.Map;
32+
import java.util.function.Function;
33+
import java.util.stream.Collectors;
34+
35+
import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.DISCOVERY_NODES;
36+
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
37+
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
38+
import static org.opensearch.gateway.remote.model.RemoteClusterBlocks.CLUSTER_BLOCKS;
39+
import static org.opensearch.gateway.remote.model.RemoteCoordinationMetadata.COORDINATION_METADATA;
40+
import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_METADATA;
41+
import static org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata.SETTING_METADATA;
42+
import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA;
43+
import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA;
44+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
45+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
46+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
47+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
48+
49+
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
50+
public class RemoteStatePublicationIT extends RemoteStoreBaseIntegTestCase {
51+
52+
private static String INDEX_NAME = "test-index";
53+
54+
@Before
55+
public void setup() {
56+
asyncUploadMockFsRepo = false;
57+
}
58+
59+
@Override
60+
protected Settings featureFlagSettings() {
61+
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL, "true").build();
62+
}
63+
64+
@Override
65+
protected Settings nodeSettings(int nodeOrdinal) {
66+
String routingTableRepoName = "remote-routing-repo";
67+
String routingTableRepoTypeAttributeKey = String.format(
68+
Locale.getDefault(),
69+
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
70+
routingTableRepoName
71+
);
72+
String routingTableRepoSettingsAttributeKeyPrefix = String.format(
73+
Locale.getDefault(),
74+
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
75+
routingTableRepoName
76+
);
77+
return Settings.builder()
78+
.put(super.nodeSettings(nodeOrdinal))
79+
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
80+
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, routingTableRepoName)
81+
.put(routingTableRepoTypeAttributeKey, ReloadableFsRepository.TYPE)
82+
.put(routingTableRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath)
83+
.build();
84+
}
85+
86+
public void testPublication() throws Exception {
87+
// create cluster with multi node (3 master + 2 data)
88+
prepareCluster(3, 2, INDEX_NAME, 1, 2);
89+
ensureStableCluster(5);
90+
ensureGreen(INDEX_NAME);
91+
// update settings on a random node
92+
assertAcked(
93+
internalCluster().client()
94+
.admin()
95+
.cluster()
96+
.updateSettings(
97+
new ClusterUpdateSettingsRequest().persistentSettings(
98+
Settings.builder().put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "10mb").build()
99+
)
100+
)
101+
.actionGet()
102+
);
103+
104+
RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);
105+
BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME);
106+
107+
Map<String, Integer> globalMetadataFiles = getMetadataFiles(repository, RemoteClusterStateUtils.GLOBAL_METADATA_PATH_TOKEN);
108+
109+
assertTrue(globalMetadataFiles.containsKey(COORDINATION_METADATA));
110+
assertTrue(globalMetadataFiles.containsKey(SETTING_METADATA));
111+
assertTrue(globalMetadataFiles.containsKey(TRANSIENT_SETTING_METADATA));
112+
assertTrue(globalMetadataFiles.containsKey(TEMPLATES_METADATA));
113+
assertTrue(globalMetadataFiles.keySet().stream().anyMatch(key -> key.startsWith(CUSTOM_METADATA)));
114+
115+
Map<String, Integer> ephemeralMetadataFiles = getMetadataFiles(
116+
repository,
117+
RemoteClusterStateUtils.CLUSTER_STATE_EPHEMERAL_PATH_TOKEN
118+
);
119+
120+
assertTrue(ephemeralMetadataFiles.containsKey(CLUSTER_BLOCKS));
121+
assertTrue(ephemeralMetadataFiles.containsKey(DISCOVERY_NODES));
122+
123+
Map<String, Integer> manifestFiles = getMetadataFiles(repository, RemoteClusterMetadataManifest.MANIFEST);
124+
assertTrue(manifestFiles.containsKey(RemoteClusterMetadataManifest.MANIFEST));
125+
126+
// get settings from each node and verify that it is updated
127+
Settings settings = clusterService().getSettings();
128+
logger.info("settings : {}", settings);
129+
for (Client client : clients()) {
130+
ClusterStateResponse response = client.admin().cluster().prepareState().clear().setMetadata(true).get();
131+
String refreshSetting = response.getState()
132+
.metadata()
133+
.settings()
134+
.get(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey());
135+
assertEquals("10mb", refreshSetting);
136+
}
137+
}
138+
139+
private Map<String, Integer> getMetadataFiles(BlobStoreRepository repository, String subDirectory) throws IOException {
140+
BlobPath metadataPath = repository.basePath()
141+
.add(
142+
Base64.getUrlEncoder()
143+
.withoutPadding()
144+
.encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8))
145+
)
146+
.add(RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN)
147+
.add(getClusterState().metadata().clusterUUID())
148+
.add(subDirectory);
149+
return repository.blobStore().blobContainer(metadataPath).listBlobs().keySet().stream().map(fileName -> {
150+
logger.info(fileName);
151+
return fileName.split(DELIMITER)[0];
152+
}).collect(Collectors.toMap(Function.identity(), key -> 1, Integer::sum));
153+
}
154+
155+
}

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -603,7 +603,7 @@ UploadedMetadataResults writeMetadataInParallel(
603603
new RemoteDiscoveryNodes(
604604
clusterState.nodes(),
605605
clusterState.version(),
606-
clusterState.stateUUID(),
606+
clusterState.metadata().clusterUUID(),
607607
blobStoreRepository.getCompressor()
608608
),
609609
listener

0 commit comments

Comments
 (0)