|
11 | 11 | import org.opensearch.action.DocWriteResponse; |
12 | 12 | import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; |
13 | 13 | import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; |
| 14 | +import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest; |
| 15 | +import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; |
14 | 16 | import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; |
15 | 17 | import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; |
16 | 18 | import org.opensearch.action.admin.indices.recovery.RecoveryResponse; |
|
25 | 27 | import org.opensearch.common.blobstore.BlobPath; |
26 | 28 | import org.opensearch.common.io.PathUtils; |
27 | 29 | import org.opensearch.common.settings.Settings; |
| 30 | +import org.opensearch.common.unit.TimeValue; |
28 | 31 | import org.opensearch.common.util.io.IOUtils; |
29 | 32 | import org.opensearch.core.common.unit.ByteSizeUnit; |
30 | 33 | import org.opensearch.core.index.Index; |
|
43 | 46 | import org.opensearch.repositories.RepositoryData; |
44 | 47 | import org.opensearch.repositories.blobstore.BlobStoreRepository; |
45 | 48 | import org.opensearch.repositories.fs.FsRepository; |
46 | | -import org.opensearch.snapshots.AbstractSnapshotIntegTestCase; |
47 | 49 | import org.opensearch.snapshots.SnapshotInfo; |
48 | 50 | import org.opensearch.snapshots.SnapshotRestoreException; |
49 | 51 | import org.opensearch.snapshots.SnapshotState; |
50 | 52 | import org.opensearch.test.InternalTestCluster; |
51 | 53 | import org.opensearch.test.OpenSearchIntegTestCase; |
52 | | -import org.junit.After; |
53 | | -import org.junit.Before; |
54 | 54 |
|
55 | 55 | import java.io.IOException; |
56 | 56 | import java.nio.file.Files; |
|
63 | 63 | import java.util.Objects; |
64 | 64 | import java.util.Optional; |
65 | 65 | import java.util.concurrent.ExecutionException; |
| 66 | +import java.util.concurrent.TimeUnit; |
66 | 67 | import java.util.stream.Collectors; |
67 | 68 | import java.util.stream.Stream; |
68 | 69 |
|
|
79 | 80 | import static org.hamcrest.Matchers.lessThanOrEqualTo; |
80 | 81 |
|
81 | 82 | @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) |
82 | | -public class RemoteRestoreSnapshotIT extends AbstractSnapshotIntegTestCase { |
83 | | - private static final String BASE_REMOTE_REPO = "test-rs-repo" + TEST_REMOTE_STORE_REPO_SUFFIX; |
84 | | - private Path remoteRepoPath; |
85 | | - |
86 | | - @Before |
87 | | - public void setup() { |
88 | | - remoteRepoPath = randomRepoPath().toAbsolutePath(); |
89 | | - } |
90 | | - |
91 | | - @After |
92 | | - public void teardown() { |
93 | | - clusterAdmin().prepareCleanupRepository(BASE_REMOTE_REPO).get(); |
94 | | - } |
95 | | - |
96 | | - @Override |
97 | | - protected Settings nodeSettings(int nodeOrdinal) { |
98 | | - return Settings.builder() |
99 | | - .put(super.nodeSettings(nodeOrdinal)) |
100 | | - .put(remoteStoreClusterSettings(BASE_REMOTE_REPO, remoteRepoPath)) |
101 | | - .build(); |
102 | | - } |
103 | | - |
104 | | - private Settings.Builder getIndexSettings(int numOfShards, int numOfReplicas) { |
105 | | - Settings.Builder settingsBuilder = Settings.builder() |
106 | | - .put(super.indexSettings()) |
107 | | - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numOfShards) |
108 | | - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numOfReplicas) |
109 | | - .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s"); |
110 | | - return settingsBuilder; |
111 | | - } |
112 | | - |
113 | | - private void indexDocuments(Client client, String indexName, int numOfDocs) { |
114 | | - indexDocuments(client, indexName, 0, numOfDocs); |
115 | | - } |
116 | | - |
117 | | - private void indexDocuments(Client client, String indexName, int fromId, int toId) { |
118 | | - for (int i = fromId; i < toId; i++) { |
119 | | - String id = Integer.toString(i); |
120 | | - client.prepareIndex(indexName).setId(id).setSource("text", "sometext").get(); |
121 | | - } |
122 | | - client.admin().indices().prepareFlush(indexName).get(); |
123 | | - } |
| 83 | +public class RemoteRestoreSnapshotIT extends RemoteSnapshotIT { |
124 | 84 |
|
125 | 85 | private void assertDocsPresentInIndex(Client client, String indexName, int numOfDocs) { |
126 | 86 | for (int i = 0; i < numOfDocs; i++) { |
@@ -997,6 +957,75 @@ public void testConcurrentSnapshotV2CreateOperation() throws InterruptedExceptio |
997 | 957 | assertThat(repositoryData.getSnapshotIds().size(), greaterThanOrEqualTo(1)); |
998 | 958 | } |
999 | 959 |
|
| 960 | + public void testConcurrentSnapshotV2CreateOperation_MasterChange() throws Exception { |
| 961 | + internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); |
| 962 | + internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); |
| 963 | + internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); |
| 964 | + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); |
| 965 | + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); |
| 966 | + String indexName1 = "testindex1"; |
| 967 | + String indexName2 = "testindex2"; |
| 968 | + String snapshotRepoName = "test-create-snapshot-repo"; |
| 969 | + Path absolutePath1 = randomRepoPath().toAbsolutePath(); |
| 970 | + logger.info("Snapshot Path [{}]", absolutePath1); |
| 971 | + |
| 972 | + Settings.Builder settings = Settings.builder() |
| 973 | + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) |
| 974 | + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) |
| 975 | + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) |
| 976 | + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) |
| 977 | + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true); |
| 978 | + createRepository(snapshotRepoName, FsRepository.TYPE, settings); |
| 979 | + |
| 980 | + Client client = client(); |
| 981 | + Settings indexSettings = getIndexSettings(20, 0).build(); |
| 982 | + createIndex(indexName1, indexSettings); |
| 983 | + |
| 984 | + Settings indexSettings2 = getIndexSettings(15, 0).build(); |
| 985 | + createIndex(indexName2, indexSettings2); |
| 986 | + |
| 987 | + final int numDocsInIndex1 = 10; |
| 988 | + final int numDocsInIndex2 = 20; |
| 989 | + indexDocuments(client, indexName1, numDocsInIndex1); |
| 990 | + indexDocuments(client, indexName2, numDocsInIndex2); |
| 991 | + ensureGreen(indexName1, indexName2); |
| 992 | + |
| 993 | + Thread thread = new Thread(() -> { |
| 994 | + try { |
| 995 | + String snapshotName = "snapshot-earlier-master"; |
| 996 | + internalCluster().nonClusterManagerClient() |
| 997 | + .admin() |
| 998 | + .cluster() |
| 999 | + .prepareCreateSnapshot(snapshotRepoName, snapshotName) |
| 1000 | + .setWaitForCompletion(true) |
| 1001 | + .setMasterNodeTimeout(TimeValue.timeValueSeconds(60)) |
| 1002 | + .get(); |
| 1003 | + |
| 1004 | + } catch (Exception ignored) {} |
| 1005 | + }); |
| 1006 | + thread.start(); |
| 1007 | + |
| 1008 | + // stop existing master |
| 1009 | + final String clusterManagerNode = internalCluster().getClusterManagerName(); |
| 1010 | + stopNode(clusterManagerNode); |
| 1011 | + |
| 1012 | + // Validate that we have greater one snapshot has been created |
| 1013 | + String snapshotName = "new-snapshot"; |
| 1014 | + try { |
| 1015 | + client().admin().cluster().prepareCreateSnapshot(snapshotRepoName, snapshotName).setWaitForCompletion(true).get(); |
| 1016 | + } catch (Exception e) { |
| 1017 | + logger.info("Exception while creating new-snapshot", e); |
| 1018 | + } |
| 1019 | + |
| 1020 | + // Validate that snapshot is present in repository data |
| 1021 | + assertBusy(() -> { |
| 1022 | + GetSnapshotsRequest request = new GetSnapshotsRequest(snapshotRepoName); |
| 1023 | + GetSnapshotsResponse response2 = client().admin().cluster().getSnapshots(request).actionGet(); |
| 1024 | + assertThat(response2.getSnapshots().size(), greaterThanOrEqualTo(1)); |
| 1025 | + }, 30, TimeUnit.SECONDS); |
| 1026 | + thread.join(); |
| 1027 | + } |
| 1028 | + |
1000 | 1029 | public void testCreateSnapshotV2WithRedIndex() throws Exception { |
1001 | 1030 | internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); |
1002 | 1031 | internalCluster().startDataOnlyNode(pinnedTimestampSettings()); |
@@ -1315,11 +1344,4 @@ public void testConcurrentV1SnapshotAndV2RepoSettingUpdate() throws Exception { |
1315 | 1344 | createV1SnapshotThread.join(); |
1316 | 1345 | } |
1317 | 1346 |
|
1318 | | - private Settings pinnedTimestampSettings() { |
1319 | | - Settings settings = Settings.builder() |
1320 | | - .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true) |
1321 | | - .build(); |
1322 | | - return settings; |
1323 | | - } |
1324 | | - |
1325 | 1347 | } |
0 commit comments