Skip to content

Commit 615a179

Browse files
author
Shailendra Singh
committed
Add integration tests for RemoteRoutingTable.
Signed-off-by: Shailendra Singh <[email protected]>
1 parent 58d1164 commit 615a179

File tree

2 files changed

+261
-1
lines changed

2 files changed

+261
-1
lines changed
Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.gateway.remote;
10+
11+
import org.junit.Before;
12+
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
13+
import org.opensearch.cluster.metadata.IndexMetadata;
14+
import org.opensearch.cluster.routing.IndexRoutingTable;
15+
import org.opensearch.cluster.routing.RoutingTable;
16+
import org.opensearch.common.blobstore.BlobPath;
17+
import org.opensearch.common.settings.Settings;
18+
import org.opensearch.index.remote.RemoteStoreEnums;
19+
import org.opensearch.index.remote.RemoteStorePathStrategy;
20+
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
21+
22+
import org.opensearch.repositories.RepositoriesService;
23+
import org.opensearch.repositories.blobstore.BlobStoreRepository;
24+
import org.opensearch.test.OpenSearchIntegTestCase;
25+
26+
import java.nio.charset.StandardCharsets;
27+
import java.util.ArrayList;
28+
import java.util.Base64;
29+
import java.util.List;
30+
import java.util.Set;
31+
import java.util.Optional;
32+
import java.util.concurrent.ExecutionException;
33+
import java.util.concurrent.atomic.AtomicInteger;
34+
35+
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
36+
import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.*;
37+
import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL;
38+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
39+
40+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
41+
public class RemoteRoutingTableServiceIT extends RemoteStoreBaseIntegTestCase {
42+
private static final String INDEX_NAME = "test-index";
43+
44+
RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);
45+
RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance(RemoteClusterStateService.class);
46+
47+
BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME);
48+
49+
@Before
50+
public void setup() {
51+
asyncUploadMockFsRepo = false;
52+
}
53+
54+
@Override
55+
protected Settings nodeSettings(int nodeOrdinal) {
56+
return Settings.builder()
57+
.put(super.nodeSettings(nodeOrdinal))
58+
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
59+
.put(REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.HASHED_PREFIX.toString())
60+
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, REPOSITORY_NAME)
61+
.put(REMOTE_PUBLICATION_EXPERIMENTAL, true)
62+
.build();
63+
}
64+
65+
private final RemoteStoreEnums.PathType pathType = RemoteStoreEnums.PathType.HASHED_PREFIX;
66+
67+
public void testRemoteRoutingTableIndexLifecycle() throws Exception {
68+
prepareCluster(1, 3, INDEX_NAME, 1, 1);
69+
ensureGreen(INDEX_NAME);
70+
71+
BlobPath baseMetadataPath = getBaseMetadataPath(repository);
72+
List<IndexRoutingTable> indexRoutingTables = new ArrayList<>(getClusterState().routingTable().indicesRouting().values());
73+
BlobPath indexRoutingPath = getIndexRoutingPath(baseMetadataPath.add(INDEX_ROUTING_PATH_TOKEN), indexRoutingTables.get(0).getIndex().getUUID());
74+
75+
AtomicInteger indexRoutingFiles = new AtomicInteger();
76+
assertBusy(() -> {
77+
indexRoutingFiles.set(repository.blobStore().blobContainer(indexRoutingPath).listBlobs().size());
78+
assertTrue(indexRoutingFiles.get() > 0);
79+
});
80+
81+
RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager();
82+
verifyUpdatesInManifestFile(remoteManifestManager);
83+
84+
List<Long> routingTableVersions = getRoutingTableVersionsFromAllNodes();
85+
assertTrue(areRoutingTableVersionsSame(routingTableVersions));
86+
87+
// Update index settings
88+
updateIndexSettings(INDEX_NAME, IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2);
89+
assertBusy(() -> {
90+
int indexRoutingFilesAfterUpdate = repository.blobStore().blobContainer(indexRoutingPath).listBlobs().size();
91+
assertTrue(indexRoutingFilesAfterUpdate > indexRoutingFiles.get());
92+
});
93+
94+
verifyUpdatesInManifestFile(remoteManifestManager);
95+
96+
routingTableVersions = getRoutingTableVersionsFromAllNodes();
97+
assertTrue(areRoutingTableVersionsSame(routingTableVersions));
98+
99+
// Delete the index and assert its deletion
100+
deleteIndexAndVerify(remoteManifestManager);
101+
102+
routingTableVersions = getRoutingTableVersionsFromAllNodes();
103+
assertTrue(areRoutingTableVersionsSame(routingTableVersions));
104+
}
105+
106+
public void testRemoteRoutingTableIndexNodeRestart() throws Exception {
107+
prepareCluster(1, 3, INDEX_NAME, 1, 1);
108+
ensureGreen(INDEX_NAME);
109+
110+
BlobPath baseMetadataPath = getBaseMetadataPath(repository);
111+
List<IndexRoutingTable> indexRoutingTables = new ArrayList<>(getClusterState().routingTable().indicesRouting().values());
112+
BlobPath indexRoutingPath = getIndexRoutingPath(baseMetadataPath.add(INDEX_ROUTING_PATH_TOKEN), indexRoutingTables.get(0).getIndex().getUUID());
113+
114+
AtomicInteger indexRoutingFiles = new AtomicInteger();
115+
assertBusy(() -> {
116+
indexRoutingFiles.set(repository.blobStore().blobContainer(indexRoutingPath).listBlobs().size());
117+
assertTrue(indexRoutingFiles.get() > 0);
118+
});
119+
120+
List<Long> routingTableVersions = getRoutingTableVersionsFromAllNodes();
121+
assertTrue(areRoutingTableVersionsSame(routingTableVersions));
122+
123+
// Ensure node comes healthy after restart
124+
Set<String> dataNodes = internalCluster().getDataNodeNames();
125+
internalCluster().restartNode(randomFrom(dataNodes));
126+
ensureGreen();
127+
ensureGreen(INDEX_NAME);
128+
129+
// ensure restarted node joins and the cluster is stable
130+
assertEquals(3, internalCluster().clusterService().state().nodes().getDataNodes().size());
131+
ensureStableCluster(4);
132+
133+
assertBusy(() -> {
134+
int indexRoutingFilesAfterNodeDrop = repository.blobStore().blobContainer(indexRoutingPath).listBlobs().size();
135+
assertTrue(indexRoutingFilesAfterNodeDrop > indexRoutingFiles.get());
136+
});
137+
138+
RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager();
139+
verifyUpdatesInManifestFile(remoteManifestManager);
140+
}
141+
142+
public void testRemoteRoutingTableIndexMasterRestart() throws Exception {
143+
prepareCluster(1, 3, INDEX_NAME, 1, 1);
144+
ensureGreen(INDEX_NAME);
145+
146+
BlobPath baseMetadataPath = getBaseMetadataPath(repository);
147+
List<IndexRoutingTable> indexRoutingTables = new ArrayList<>(getClusterState().routingTable().indicesRouting().values());
148+
BlobPath indexRoutingPath = getIndexRoutingPath(baseMetadataPath.add(INDEX_ROUTING_PATH_TOKEN), indexRoutingTables.get(0).getIndex().getUUID());
149+
150+
AtomicInteger indexRoutingFiles = new AtomicInteger();
151+
assertBusy(() -> {
152+
indexRoutingFiles.set(repository.blobStore().blobContainer(indexRoutingPath).listBlobs().size());
153+
assertTrue(indexRoutingFiles.get() > 0);
154+
});
155+
156+
List<Long> routingTableVersions = getRoutingTableVersionsFromAllNodes();
157+
assertTrue(areRoutingTableVersionsSame(routingTableVersions));
158+
159+
// Ensure node comes healthy after restart
160+
String clusterManagerName = internalCluster().getClusterManagerName();
161+
internalCluster().restartNode(clusterManagerName);
162+
ensureGreen();
163+
ensureGreen(INDEX_NAME);
164+
165+
// ensure master is elected and the cluster is stable
166+
assertEquals(1, internalCluster().clusterService().state().nodes().getClusterManagerNode());
167+
ensureStableCluster(4);
168+
169+
assertBusy(() -> {
170+
int indexRoutingFilesAfterNodeDrop = repository.blobStore().blobContainer(indexRoutingPath).listBlobs().size();
171+
assertTrue(indexRoutingFilesAfterNodeDrop > indexRoutingFiles.get());
172+
});
173+
174+
RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager();
175+
verifyUpdatesInManifestFile(remoteManifestManager);
176+
}
177+
178+
private BlobPath getBaseMetadataPath(BlobStoreRepository repository) {
179+
return repository.basePath()
180+
.add(Base64.getUrlEncoder().withoutPadding().encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8)))
181+
.add("cluster-state")
182+
.add(getClusterState().metadata().clusterUUID());
183+
}
184+
185+
private BlobPath getIndexRoutingPath(BlobPath indexRoutingPath, String indexUUID) {
186+
RemoteStoreEnums.PathHashAlgorithm pathHashAlgo = RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64;
187+
return pathType.path(
188+
RemoteStorePathStrategy.PathInput.builder()
189+
.basePath(indexRoutingPath)
190+
.indexUUID(indexUUID)
191+
.build(),
192+
pathHashAlgo
193+
);
194+
}
195+
196+
private void verifyUpdatesInManifestFile(RemoteManifestManager remoteManifestManager) {
197+
Optional<ClusterMetadataManifest> latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
198+
getClusterState().getClusterName().value(),
199+
getClusterState().getMetadata().clusterUUID()
200+
);
201+
assertTrue(latestManifest.isPresent());
202+
ClusterMetadataManifest manifest = latestManifest.get();
203+
assertTrue(manifest.getDiffManifest().getIndicesRoutingUpdated().contains(INDEX_NAME));
204+
assertTrue(manifest.getDiffManifest().getIndicesDeleted().isEmpty());
205+
}
206+
207+
private List<Long> getRoutingTableVersionsFromAllNodes() throws ExecutionException, InterruptedException {
208+
String[] allNodes = internalCluster().getNodeNames();
209+
List<Long> routingTableVersions = new ArrayList<>();
210+
for (String node : allNodes) {
211+
RoutingTable routingTable = internalCluster().client(node)
212+
.admin()
213+
.cluster()
214+
.state(new ClusterStateRequest().local(true))
215+
.get()
216+
.getState()
217+
.routingTable();
218+
routingTableVersions.add(routingTable.version());
219+
}
220+
return routingTableVersions;
221+
}
222+
223+
private void updateIndexSettings(String indexName, String settingKey, int settingValue) {
224+
client().admin()
225+
.indices()
226+
.prepareUpdateSettings(indexName)
227+
.setSettings(Settings.builder().put(settingKey, settingValue))
228+
.execute()
229+
.actionGet();
230+
}
231+
232+
private void deleteIndexAndVerify(RemoteManifestManager remoteManifestManager) {
233+
client().admin().indices().prepareDelete(INDEX_NAME).execute().actionGet();
234+
assertFalse(client().admin().indices().prepareExists(INDEX_NAME).get().isExists());
235+
236+
Optional<ClusterMetadataManifest> latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
237+
getClusterState().getClusterName().value(),
238+
getClusterState().getMetadata().clusterUUID()
239+
);
240+
assertTrue(latestManifest.isPresent());
241+
ClusterMetadataManifest manifest = latestManifest.get();
242+
assertTrue(manifest.getDiffManifest().getIndicesRoutingUpdated().isEmpty());
243+
assertTrue(manifest.getDiffManifest().getIndicesDeleted().contains(INDEX_NAME));
244+
}
245+
246+
private boolean areRoutingTableVersionsSame(List<Long> routingTableVersions) {
247+
if (routingTableVersions == null || routingTableVersions.isEmpty()) {
248+
return false;
249+
}
250+
251+
Long firstVersion = routingTableVersions.get(0);
252+
for (Long routingTableVersion : routingTableVersions) {
253+
if (!firstVersion.equals(routingTableVersion)) {
254+
logger.info("Responses are not the same: {} {}", firstVersion, routingTableVersion);
255+
return false;
256+
}
257+
}
258+
return true;
259+
}
260+
}

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1437,7 +1437,7 @@ public void setRemoteStateReadTimeout(TimeValue remoteStateReadTimeout) {
14371437
this.remoteStateReadTimeout = remoteStateReadTimeout;
14381438
}
14391439

1440-
private BlobStoreTransferService getBlobStoreTransferService() {
1440+
public BlobStoreTransferService getBlobStoreTransferService() {
14411441
if (blobStoreTransferService == null) {
14421442
blobStoreTransferService = new BlobStoreTransferService(getBlobStore(), threadpool);
14431443
}

0 commit comments

Comments
 (0)