diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java index 7b0dc19a92807..9929a614ea3e4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java @@ -8,6 +8,9 @@ package org.opensearch.remotestore; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + import org.opensearch.action.DocWriteResponse; import org.opensearch.action.LatchedActionListener; import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; @@ -30,6 +33,7 @@ import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.index.Index; import org.opensearch.core.rest.RestStatus; import org.opensearch.index.IndexService; @@ -37,10 +41,12 @@ import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm; import org.opensearch.index.remote.RemoteStoreEnums.PathType; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; import org.opensearch.indices.IndicesService; import org.opensearch.indices.RemoteStoreSettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.node.Node; import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; @@ -60,6 +66,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -77,6 +84,7 @@ import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY; +import static org.opensearch.common.util.FeatureFlags.WRITABLE_WARM_INDEX_SETTING; import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS; import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG; import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA; @@ -89,9 +97,31 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; +@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoteRestoreSnapshotIT extends RemoteSnapshotIT { + public RemoteRestoreSnapshotIT(Settings nodeSettings) { + super(nodeSettings); + } + + @ParametersFactory + public static Collection parameters() { + return Arrays.asList( + new Object[] { Settings.builder().put(WRITABLE_WARM_INDEX_SETTING.getKey(), false).build() }, + new Object[] { Settings.builder().put(WRITABLE_WARM_INDEX_SETTING.getKey(), true).build() } + ); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + ByteSizeValue cacheSize = new ByteSizeValue(16, ByteSizeUnit.GB); + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize.toString()) + .build(); + } + private void assertDocsPresentInIndex(Client client, String indexName, int numOfDocs) { for (int i = 0; i < numOfDocs; i++) { String id = Integer.toString(i); @@ -102,7 +132,7 @@ private void assertDocsPresentInIndex(Client client, String indexName, int numOf public void testRestoreOperationsShallowCopyEnabled() throws Exception { String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); - String primary = internalCluster().startDataOnlyNode(); + String primary = internalCluster().startDataAndWarmNodes(1).get(0); String indexName1 = "testindex1"; String indexName2 = "testindex2"; String snapshotRepoName = "test-restore-snapshot-repo"; @@ -128,7 +158,7 @@ public void testRestoreOperationsShallowCopyEnabled() throws Exception { indexDocuments(client, indexName2, numDocsInIndex2); ensureGreen(indexName1, indexName2); - internalCluster().startDataOnlyNode(); + internalCluster().startDataAndWarmNodes(1); logger.info("--> snapshot"); SnapshotInfo snapshotInfo = createSnapshot(snapshotRepoName, snapshotName1, new ArrayList<>(Arrays.asList(indexName1, indexName2))); @@ -200,7 +230,7 @@ public void testRestoreOperationsShallowCopyEnabled() throws Exception { */ public void testRemoteStoreCustomDataOnIndexCreationAndRestore() { String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); - internalCluster().startDataOnlyNode(); + internalCluster().startDataAndWarmNodes(1); String indexName1 = "testindex1"; String indexName2 = "testindex2"; String snapshotRepoName = "test-restore-snapshot-repo"; @@ -313,7 +343,7 @@ private void validatePathType(String index, PathType pathType, @Nullable PathHas public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException { String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); - String primary = internalCluster().startDataOnlyNode(); + String primary = internalCluster().startDataAndWarmNodes(1).get(0); String indexName1 = "testindex1"; String indexName2 = "testindex2"; String snapshotRepoName = "test-restore-snapshot-repo"; @@ -339,7 +369,7 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException { indexDocuments(client, indexName2, numDocsInIndex2); ensureGreen(indexName1, indexName2); - internalCluster().startDataOnlyNode(); + internalCluster().startDataAndWarmNodes(1); logger.info("--> snapshot"); SnapshotInfo snapshotInfo1 = createSnapshot( snapshotRepoName, @@ -441,7 +471,7 @@ void assertRemoteSegmentsAndTranslogUploaded(String idx) throws IOException { public void testRemoteRestoreIndexRestoredFromSnapshot() throws IOException, ExecutionException, InterruptedException { internalCluster().startClusterManagerOnlyNode(); - internalCluster().startDataOnlyNodes(2); + internalCluster().startDataAndWarmNodes(2); String indexName1 = "testindex1"; String snapshotRepoName = "test-restore-snapshot-repo"; @@ -500,7 +530,7 @@ public void testRemoteRestoreIndexRestoredFromSnapshot() throws IOException, Exe public void testSuccessfulIndexRestoredFromSnapshotWithUpdatedSetting() throws IOException, ExecutionException, InterruptedException { internalCluster().startClusterManagerOnlyNode(); - internalCluster().startDataOnlyNodes(2); + internalCluster().startDataAndWarmNodes(2); String indexName1 = "testindex1"; String snapshotRepoName = "test-restore-snapshot-repo"; @@ -716,7 +746,7 @@ public void testRestoreShallowSnapshotIndexAfterSnapshot() throws ExecutionExcep public void testInvalidRestoreRequestScenarios() throws Exception { internalCluster().startClusterManagerOnlyNode(); - internalCluster().startDataOnlyNode(); + internalCluster().startDataAndWarmNodes(1); String index = "test-index"; String snapshotRepo = "test-restore-snapshot-repo"; String newRemoteStoreRepo = "test-new-rs-repo"; @@ -736,7 +766,7 @@ public void testInvalidRestoreRequestScenarios() throws Exception { indexDocuments(client, index, numDocsInIndex); ensureGreen(index); - internalCluster().startDataOnlyNode(); + internalCluster().startDataAndWarmNodes(1); logger.info("--> snapshot"); SnapshotInfo snapshotInfo = createSnapshot(snapshotRepo, snapshotName1, new ArrayList<>(List.of(index))); @@ -896,8 +926,8 @@ public void testInvalidRestoreRequestScenarios() throws Exception { public void testCreateSnapshotV2_Orphan_Timestamp_Cleanup() throws Exception { internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); String indexName1 = "testindex1"; String indexName2 = "testindex2"; String snapshotRepoName = "test-create-snapshot-repo"; @@ -962,8 +992,8 @@ public void onFailure(Exception e) {} public void testMixedSnapshotCreationWithV2RepositorySetting() throws Exception { internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); String indexName1 = "testindex1"; String indexName2 = "testindex2"; String indexName3 = "testindex3"; @@ -1034,8 +1064,8 @@ public void testMixedSnapshotCreationWithV2RepositorySetting() throws Exception public void testConcurrentSnapshotV2CreateOperation() throws InterruptedException, ExecutionException { internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); String indexName1 = "testindex1"; String indexName2 = "testindex2"; String snapshotRepoName = "test-create-snapshot-repo"; @@ -1108,8 +1138,8 @@ public void testConcurrentSnapshotV2CreateOperation_MasterChange() throws Except internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); String indexName1 = "testindex1"; String indexName2 = "testindex2"; String snapshotRepoName = "test-create-snapshot-repo"; @@ -1184,8 +1214,8 @@ public void testConcurrentSnapshotV2CreateOperation_MasterChange() throws Except public void testCreateSnapshotV2() throws Exception { internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); String indexName1 = "testindex1"; String indexName2 = "testindex2"; String indexName3 = "testindex3"; @@ -1254,8 +1284,8 @@ public void forceSyncPinnedTimestamps() { public void testCreateSnapshotV2WithRedIndex() throws Exception { internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); String indexName1 = "testindex1"; String indexName2 = "testindex2"; String snapshotRepoName = "test-create-snapshot-repo"; @@ -1302,8 +1332,8 @@ public void testCreateSnapshotV2WithRedIndex() throws Exception { public void testCreateSnapshotV2WithIndexingLoad() throws Exception { internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); String indexName1 = "testindex1"; String indexName2 = "testindex2"; String snapshotRepoName = "test-create-snapshot-repo"; @@ -1370,8 +1400,8 @@ public void testCreateSnapshotV2WithIndexingLoad() throws Exception { public void testCreateSnapshotV2WithShallowCopySettingDisabled() throws Exception { internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); String indexName1 = "testindex1"; String indexName2 = "testindex2"; String snapshotRepoName = "test-create-snapshot-repo"; @@ -1419,7 +1449,7 @@ public void testCreateSnapshotV2WithShallowCopySettingDisabled() throws Exceptio public void testClusterManagerFailoverDuringSnapshotCreation() throws Exception { internalCluster().startClusterManagerOnlyNodes(3, pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); String indexName1 = "testindex1"; String indexName2 = "testindex2"; String snapshotRepoName = "test-create-snapshot-repo"; @@ -1493,8 +1523,8 @@ public void testClusterManagerFailoverDuringSnapshotCreation() throws Exception public void testConcurrentV1SnapshotAndV2RepoSettingUpdate() throws Exception { internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); String snapshotRepoName = "test-create-snapshot-repo"; String snapshotName1 = "test-create-snapshot-v1"; Path absolutePath1 = randomRepoPath().toAbsolutePath(); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteSnapshotIT.java index 89859c743eaaa..da618ffb64bee 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteSnapshotIT.java @@ -13,8 +13,10 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexModule; import org.opensearch.index.IndexSettings; import org.opensearch.indices.RemoteStoreSettings; +import org.opensearch.node.Node; import org.opensearch.repositories.fs.ReloadableFsRepository; import org.opensearch.snapshots.AbstractSnapshotIntegTestCase; import org.opensearch.transport.client.Client; @@ -24,9 +26,20 @@ import java.nio.file.Path; import java.util.concurrent.ExecutionException; +import static org.opensearch.common.util.FeatureFlags.WRITABLE_WARM_INDEX_SETTING; import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; public abstract class RemoteSnapshotIT extends AbstractSnapshotIntegTestCase { + + public RemoteSnapshotIT(Settings nodeSettings) { + super(nodeSettings); + } + + public RemoteSnapshotIT() { + super(); + } + protected static final String BASE_REMOTE_REPO = "test-rs-repo" + TEST_REMOTE_STORE_REPO_SUFFIX; protected Path remoteRepoPath; @@ -38,6 +51,16 @@ public void setup() { @After public void teardown() { clusterAdmin().prepareCleanupRepository(BASE_REMOTE_REPO).get(); + if (WRITABLE_WARM_INDEX_SETTING.get(settings)) { + assertAcked(client().admin().indices().prepareDelete("_all").get()); + var nodes = internalCluster().getDataNodeInstances(Node.class); + for (var node : nodes) { + var fileCache = node.fileCache(); + if (fileCache != null) { + fileCache.clear(); + } + } + } } @Override @@ -61,6 +84,9 @@ protected Settings.Builder getIndexSettings(int numOfShards, int numOfReplicas) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numOfShards) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numOfReplicas) .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s"); + if (WRITABLE_WARM_INDEX_SETTING.get(settings)) { + settingsBuilder.put(IndexModule.IS_WARM_INDEX_SETTING.getKey(), true); + } return settingsBuilder; } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RestoreShallowSnapshotV2IT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RestoreShallowSnapshotV2IT.java index 1583078782611..19c84b818d692 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RestoreShallowSnapshotV2IT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RestoreShallowSnapshotV2IT.java @@ -8,6 +8,9 @@ package org.opensearch.remotestore; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + import org.opensearch.action.DocWriteResponse; import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest; @@ -27,17 +30,22 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.index.Index; import org.opensearch.core.rest.RestStatus; +import org.opensearch.index.IndexModule; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.remote.RemoteStoreEnums; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; import org.opensearch.indices.IndicesService; import org.opensearch.indices.RemoteStoreSettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.node.Node; import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.snapshots.AbstractSnapshotIntegTestCase; @@ -57,6 +65,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -70,6 +79,7 @@ import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY; +import static org.opensearch.common.util.FeatureFlags.WRITABLE_WARM_INDEX_SETTING; import static org.opensearch.index.query.QueryBuilders.matchAllQuery; import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS; import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG; @@ -83,12 +93,17 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; +@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RestoreShallowSnapshotV2IT extends AbstractSnapshotIntegTestCase { private static final String BASE_REMOTE_REPO = "test-rs-repo" + TEST_REMOTE_STORE_REPO_SUFFIX; private Path remoteRepoPath; + public RestoreShallowSnapshotV2IT(Settings nodeSettings) { + super(nodeSettings); + } + @Before public void setup() { remoteRepoPath = randomRepoPath().toAbsolutePath(); @@ -97,14 +112,32 @@ public void setup() { @After public void teardown() { clusterAdmin().prepareCleanupRepository(BASE_REMOTE_REPO).get(); + if (WRITABLE_WARM_INDEX_SETTING.get(settings)) { + assertAcked(client().admin().indices().prepareDelete("_all").get()); + var nodes = internalCluster().getDataNodeInstances(Node.class); + for (var node : nodes) { + var fileCache = node.fileCache(); + fileCache.clear(); + } + } + } + + @ParametersFactory + public static Collection parameters() { + return Arrays.asList( + new Object[] { Settings.builder().put(WRITABLE_WARM_INDEX_SETTING.getKey(), false).build() }, + new Object[] { Settings.builder().put(WRITABLE_WARM_INDEX_SETTING.getKey(), true).build() } + ); } @Override protected Settings nodeSettings(int nodeOrdinal) { + ByteSizeValue cacheSize = new ByteSizeValue(16, ByteSizeUnit.GB); return Settings.builder() .put(super.nodeSettings(nodeOrdinal)) .put(remoteStoreClusterSettings(BASE_REMOTE_REPO, remoteRepoPath)) .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true) + .put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize.toString()) .build(); } @@ -137,6 +170,9 @@ private Settings.Builder getIndexSettings(int numOfShards, int numOfReplicas) { .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numOfShards) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numOfReplicas) .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s"); + if (WRITABLE_WARM_INDEX_SETTING.get(settings)) { + settingsBuilder.put(IndexModule.IS_WARM_INDEX_SETTING.getKey(), true); + } return settingsBuilder; } @@ -161,7 +197,7 @@ private void assertDocsPresentInIndex(Client client, String indexName, int numOf public void testRestoreOperationsShallowCopyEnabled() throws Exception { String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); - String primary = internalCluster().startDataOnlyNode(); + String primary = internalCluster().startDataAndWarmNodes(1).get(0); String indexName1 = "testindex1"; String indexName2 = "testindex2"; String snapshotRepoName = "test-restore-snapshot-repo"; @@ -187,7 +223,7 @@ public void testRestoreOperationsShallowCopyEnabled() throws Exception { indexDocuments(client, indexName2, numDocsInIndex2); ensureGreen(indexName1, indexName2); - internalCluster().startDataOnlyNode(); + internalCluster().startDataAndWarmNodes(1); logger.info("--> snapshot"); SnapshotInfo snapshotInfo = createSnapshot(snapshotRepoName, snapshotName1, new ArrayList<>()); @@ -255,7 +291,7 @@ public void testRestoreOperationsShallowCopyEnabled() throws Exception { public void testRemoteStoreCustomDataOnIndexCreationAndRestore() { String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); - internalCluster().startDataOnlyNode(); + internalCluster().startDataAndWarmNodes(1); String indexName1 = "testindex1"; String indexName2 = "testindex2"; String snapshotRepoName = "test-restore-snapshot-repo"; @@ -378,7 +414,7 @@ private void validatePathType( public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException { String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); - String primary = internalCluster().startDataOnlyNode(); + String primary = internalCluster().startDataAndWarmNodes(1).get(0); String indexName1 = "testindex1"; String indexName2 = "testindex2"; String snapshotRepoName = "test-restore-snapshot-repo"; @@ -404,7 +440,7 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException { indexDocuments(client, indexName2, numDocsInIndex2); ensureGreen(indexName1, indexName2); - internalCluster().startDataOnlyNode(); + internalCluster().startDataAndWarmNodes(1); logger.info("--> snapshot"); SnapshotInfo snapshotInfo1 = createSnapshot( snapshotRepoName, @@ -506,7 +542,7 @@ void assertRemoteSegmentsAndTranslogUploaded(String idx) throws IOException { public void testRemoteRestoreIndexRestoredFromSnapshot() throws IOException, ExecutionException, InterruptedException { internalCluster().startClusterManagerOnlyNode(); - internalCluster().startDataOnlyNodes(2); + internalCluster().startDataAndWarmNodes(2); String indexName1 = "testindex1"; String snapshotRepoName = "test-restore-snapshot-repo"; @@ -565,7 +601,7 @@ public void testRemoteRestoreIndexRestoredFromSnapshot() throws IOException, Exe public void testSuccessfulIndexRestoredFromSnapshotWithUpdatedSetting() throws IOException, ExecutionException, InterruptedException { internalCluster().startClusterManagerOnlyNode(); - internalCluster().startDataOnlyNodes(2); + internalCluster().startDataAndWarmNodes(2); String indexName1 = "testindex1"; String snapshotRepoName = "test-restore-snapshot-repo"; @@ -618,6 +654,8 @@ private IndexShard getIndexShard(String node, String indexName) { } public void testRestoreShallowSnapshotRepository() throws ExecutionException, InterruptedException { + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startDataAndWarmNodes(1); String indexName1 = "testindex1"; String snapshotRepoName = "test-restore-snapshot-repo"; String remoteStoreRepoNameUpdated = "test-rs-repo-updated" + TEST_REMOTE_STORE_REPO_SUFFIX; @@ -639,13 +677,7 @@ public void testRestoreShallowSnapshotRepository() throws ExecutionException, In createRepository(snapshotRepoName, "fs", getRepositorySettings(location, basePath, true)); Client client = client(); - Settings indexSettings = Settings.builder() - .put(super.indexSettings()) - .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s") - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .build(); + Settings indexSettings = getIndexSettings(1, 0).put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build(); createIndex(indexName1, indexSettings); int numDocsInIndex1 = randomIntBetween(2, 5); @@ -713,6 +745,8 @@ public void testRestoreShallowSnapshotRepository() throws ExecutionException, In } public void testRestoreShallowSnapshotIndexAfterSnapshot() throws ExecutionException, InterruptedException { + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startDataAndWarmNodes(1); String indexName1 = "testindex1"; String snapshotRepoName = "test-restore-snapshot-repo"; String remoteStoreRepoNameUpdated = "test-rs-repo-updated" + TEST_REMOTE_STORE_REPO_SUFFIX; @@ -734,11 +768,8 @@ public void testRestoreShallowSnapshotIndexAfterSnapshot() throws ExecutionExcep createRepository(snapshotRepoName, "fs", getRepositorySettings(location, basePath, true)); Client client = client(); - Settings indexSettings = Settings.builder() - .put(super.indexSettings()) - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + Settings indexSettings = getIndexSettings(1, 0).put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), IndexSettings.DEFAULT_REFRESH_INTERVAL) .build(); createIndex(indexName1, indexSettings); @@ -781,7 +812,7 @@ public void testRestoreShallowSnapshotIndexAfterSnapshot() throws ExecutionExcep public void testInvalidRestoreRequestScenarios() throws Exception { internalCluster().startClusterManagerOnlyNode(); - internalCluster().startDataOnlyNode(); + internalCluster().startDataAndWarmNodes(1); String index = "test-index"; String snapshotRepo = "test-restore-snapshot-repo"; String newRemoteStoreRepo = "test-new-rs-repo"; @@ -801,7 +832,7 @@ public void testInvalidRestoreRequestScenarios() throws Exception { indexDocuments(client, index, numDocsInIndex); ensureGreen(index); - internalCluster().startDataOnlyNode(); + internalCluster().startDataAndWarmNodes(1); logger.info("--> snapshot"); SnapshotInfo snapshotInfo = createSnapshot(snapshotRepo, snapshotName1, new ArrayList<>(List.of(index))); @@ -962,7 +993,7 @@ public void testInvalidRestoreRequestScenarios() throws Exception { public void testRestoreOperationsUsingDifferentRepos() throws Exception { disableRepoConsistencyCheck("Remote store repo"); String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); - String primary = internalCluster().startDataOnlyNode(); + String primary = internalCluster().startDataAndWarmNodes(1).get(0); String indexName1 = "testindex1"; String snapshotRepoName = "test-snapshot-repo"; String snapshotName1 = "test-snapshot1"; @@ -1028,7 +1059,7 @@ public void testRestoreOperationsUsingDifferentRepos() throws Exception { public void testContinuousIndexing() throws Exception { internalCluster().startClusterManagerOnlyNode(); - internalCluster().startDataOnlyNode(); + internalCluster().startDataAndWarmNodes(1); String index = "test-index"; String snapshotRepo = "test-restore-snapshot-repo"; String baseSnapshotName = "snapshot_"; @@ -1038,11 +1069,10 @@ public void testContinuousIndexing() throws Exception { createRepository(snapshotRepo, "fs", getRepositorySettings(absolutePath1, true)); Client client = client(); - Settings indexSettings = Settings.builder() - .put(super.indexSettings()) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .build(); + Settings indexSettings = getIndexSettings(1, 0).put( + IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), + IndexSettings.DEFAULT_REFRESH_INTERVAL + ).build(); createIndex(index, indexSettings); ensureGreen(index); @@ -1118,7 +1148,7 @@ public void testHashedPrefixTranslogMetadataCombination() throws Exception { .build(); internalCluster().startClusterManagerOnlyNode(settings); - internalCluster().startDataOnlyNode(settings); + var primary = internalCluster().startDataAndWarmNodes(1, settings).get(0); String index = "test-index"; String snapshotRepo = "test-restore-snapshot-repo"; String baseSnapshotName = "snapshot_"; @@ -1128,11 +1158,10 @@ public void testHashedPrefixTranslogMetadataCombination() throws Exception { createRepository(snapshotRepo, "fs", getRepositorySettings(absolutePath1, true)); Client client = client(); - Settings indexSettings = Settings.builder() - .put(super.indexSettings()) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .build(); + Settings indexSettings = getIndexSettings(1, 0).put( + IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), + IndexSettings.DEFAULT_REFRESH_INTERVAL + ).build(); createIndex(index, indexSettings); ensureGreen(index); diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotV2IT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotV2IT.java index e8ad82bbd9a80..8de4f11939f44 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotV2IT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotV2IT.java @@ -31,6 +31,9 @@ package org.opensearch.snapshots; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + import org.opensearch.action.ActionRunnable; import org.opensearch.action.DocWriteResponse; import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; @@ -45,33 +48,74 @@ import org.opensearch.cluster.metadata.RepositoriesMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.rest.RestStatus; +import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; import org.opensearch.indices.RemoteStoreSettings; +import org.opensearch.node.Node; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.RepositoryData; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.fs.FsRepository; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.transport.client.Client; +import org.junit.After; import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import static org.opensearch.common.util.FeatureFlags.WRITABLE_WARM_INDEX_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class CloneSnapshotV2IT extends AbstractSnapshotIntegTestCase { + public CloneSnapshotV2IT(Settings nodeSettings) { + super(nodeSettings); + } + + @ParametersFactory + public static Collection parameters() { + return Arrays.asList( + new Object[] { Settings.builder().put(WRITABLE_WARM_INDEX_SETTING.getKey(), false).build() }, + new Object[] { Settings.builder().put(WRITABLE_WARM_INDEX_SETTING.getKey(), true).build() } + ); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + ByteSizeValue cacheSize = new ByteSizeValue(16, ByteSizeUnit.GB); + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize.toString()) + .build(); + } + + @After + public void teardown() { + if (WRITABLE_WARM_INDEX_SETTING.get(settings)) { + assertAcked(client().admin().indices().prepareDelete("_all").get()); + var nodes = internalCluster().getDataNodeInstances(Node.class); + for (var node : nodes) { + var fileCache = node.fileCache(); + fileCache.clear(); + } + } + } + public void testCloneShallowCopyV2() throws Exception { disableRepoConsistencyCheck("Remote store repository is being used in the test"); final Path remoteStoreRepoPath = randomRepoPath(); internalCluster().startClusterManagerOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); - internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); - internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); + internalCluster().startDataAndWarmNodes(1, snapshotV2Settings(remoteStoreRepoPath)); + internalCluster().startDataAndWarmNodes(1, snapshotV2Settings(remoteStoreRepoPath)); String indexName1 = "testindex1"; String indexName2 = "testindex2"; @@ -179,8 +223,8 @@ public void testCloneShallowCopyV2DeletedIndex() throws Exception { disableRepoConsistencyCheck("Remote store repository is being used in the test"); final Path remoteStoreRepoPath = randomRepoPath(); internalCluster().startClusterManagerOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); - internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); - internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); + internalCluster().startDataAndWarmNodes(1, snapshotV2Settings(remoteStoreRepoPath)); + internalCluster().startDataAndWarmNodes(1, snapshotV2Settings(remoteStoreRepoPath)); String indexName1 = "testindex1"; String indexName2 = "testindex2"; @@ -291,8 +335,8 @@ public void testCloneShallowCopyAfterDisablingV2() throws Exception { disableRepoConsistencyCheck("Remote store repository is being used in the test"); final Path remoteStoreRepoPath = randomRepoPath(); internalCluster().startClusterManagerOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); - internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); - internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); + internalCluster().startDataAndWarmNodes(1, snapshotV2Settings(remoteStoreRepoPath)); + internalCluster().startDataAndWarmNodes(1, snapshotV2Settings(remoteStoreRepoPath)); String indexName1 = "testindex1"; String indexName2 = "testindex2"; @@ -444,8 +488,8 @@ public void testRestoreFromClone() throws Exception { final Path remoteStoreRepoPath = randomRepoPath(); internalCluster().startClusterManagerOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); - internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); - internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); + internalCluster().startDataAndWarmNodes(1, snapshotV2Settings(remoteStoreRepoPath)); + internalCluster().startDataAndWarmNodes(1, snapshotV2Settings(remoteStoreRepoPath)); String indexName1 = "testindex1"; String indexName2 = "testindex2"; diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/ConcurrentSnapshotsV2IT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/ConcurrentSnapshotsV2IT.java index a2fed10955ffc..3e8a3acb8498f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/ConcurrentSnapshotsV2IT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/ConcurrentSnapshotsV2IT.java @@ -8,6 +8,9 @@ package org.opensearch.snapshots; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.opensearch.action.support.PlainActionFuture; @@ -16,7 +19,10 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.rest.RestStatus; +import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; +import org.opensearch.node.Node; import org.opensearch.remotestore.RemoteSnapshotIT; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; @@ -28,22 +34,47 @@ import java.nio.file.Path; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.concurrent.ExecutionException; +import static org.opensearch.common.util.FeatureFlags.WRITABLE_WARM_INDEX_SETTING; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class ConcurrentSnapshotsV2IT extends RemoteSnapshotIT { + public ConcurrentSnapshotsV2IT(Settings nodeSettings) { + super(nodeSettings); + } + + @ParametersFactory + public static Collection parameters() { + return Arrays.asList( + new Object[] { Settings.builder().put(WRITABLE_WARM_INDEX_SETTING.getKey(), false).build() }, + new Object[] { Settings.builder().put(WRITABLE_WARM_INDEX_SETTING.getKey(), true).build() } + ); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + ByteSizeValue cacheSize = new ByteSizeValue(16, ByteSizeUnit.GB); + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize.toString()) + .build(); + } + public void testLongRunningSnapshotDontAllowConcurrentSnapshot() throws Exception { final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); String indexName1 = "testindex1"; String indexName2 = "testindex2"; String repoName = "test-create-snapshot-repo"; @@ -91,8 +122,8 @@ public void testLongRunningSnapshotDontAllowConcurrentSnapshot() throws Exceptio public void testCreateSnapshotFailInFinalize() throws Exception { final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); String indexName1 = "testindex1"; String indexName2 = "testindex2"; String repoName = "test-create-snapshot-repo"; @@ -141,8 +172,8 @@ public void testCreateSnapshotV2MasterSwitch() throws Exception { internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); String indexName1 = "testindex1"; String indexName2 = "testindex2"; String repoName = "test-create-snapshot-repo"; @@ -194,8 +225,8 @@ public void testCreateSnapshotV2MasterSwitch() throws Exception { public void testPinnedTimestampFailSnapshot() throws Exception { internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); String indexName1 = "testindex1"; String indexName2 = "testindex2"; String repoName = "test-create-snapshot-repo"; @@ -250,8 +281,8 @@ public void testPinnedTimestampFailSnapshot() throws Exception { public void testConcurrentSnapshotV2CreateOperation() throws InterruptedException, ExecutionException { internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); String indexName1 = "testindex1"; String indexName2 = "testindex2"; String snapshotRepoName = "test-create-snapshot-repo"; @@ -325,8 +356,8 @@ public void testConcurrentSnapshotV2CreateOperation() throws InterruptedExceptio public void testLongRunningSnapshotDontAllowConcurrentClone() throws Exception { final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); String indexName1 = "testindex1"; String indexName2 = "testindex2"; String repoName = "test-create-snapshot-repo"; @@ -376,8 +407,8 @@ public void testLongRunningSnapshotDontAllowConcurrentClone() throws Exception { public void testCloneSnapshotFailInFinalize() throws Exception { final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); String indexName1 = "testindex1"; String indexName2 = "testindex2"; String repoName = "test-create-snapshot-repo"; @@ -429,8 +460,8 @@ public void testCloneSnapshotV2MasterSwitch() throws Exception { internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); String indexName1 = "testindex1"; String indexName2 = "testindex2"; String repoName = "test-create-snapshot-repo"; @@ -483,8 +514,8 @@ public void testCloneSnapshotV2MasterSwitch() throws Exception { public void testDeleteWhileV2CreateOngoing() throws Exception { final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); String indexName1 = "testindex1"; String indexName2 = "testindex2"; String repoName = "test-create-snapshot-repo"; @@ -541,8 +572,8 @@ public void testDeleteWhileV2CreateOngoing() throws Exception { @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/16205") public void testDeleteAndCloneV1WhileV2CreateOngoing() throws Exception { final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); String indexName1 = "testindex1"; String indexName2 = "testindex2"; String repoName = "test-create-snapshot-repo"; @@ -614,8 +645,8 @@ public void testDeleteAndCloneV1WhileV2CreateOngoing() throws Exception { @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/16205") public void testDeleteAndCloneV1WhileCreateOngoing() throws Exception { final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); String indexName1 = "testindex1"; String indexName2 = "testindex2"; String repoName = "test-create-snapshot-repo"; @@ -675,8 +706,8 @@ public void testDeleteAndCloneV1WhileCreateOngoing() throws Exception { public void testCloneV1WhileV2CreateOngoing() throws Exception { final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); - internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); + internalCluster().startDataAndWarmNodes(1, pinnedTimestampSettings()); String indexName1 = "testindex1"; String indexName2 = "testindex2"; String repoName = "test-create-snapshot-repo"; diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java index 33f0768e391b5..50f1e056248a4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java @@ -8,36 +8,81 @@ package org.opensearch.snapshots; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.opensearch.action.support.clustermanager.AcknowledgedResponse; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.index.remote.RemoteStoreEnums; +import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; import org.opensearch.indices.RemoteStoreSettings; +import org.opensearch.node.Node; import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService; import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.fs.FsRepository; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.transport.client.Client; +import org.junit.After; import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; +import static org.opensearch.common.util.FeatureFlags.WRITABLE_WARM_INDEX_SETTING; import static org.opensearch.index.IndexSettings.INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class DeleteSnapshotV2IT extends AbstractSnapshotIntegTestCase { + public DeleteSnapshotV2IT(Settings nodeSettings) { + super(nodeSettings); + } + + @ParametersFactory + public static Collection parameters() { + return Arrays.asList( + new Object[] { Settings.builder().put(WRITABLE_WARM_INDEX_SETTING.getKey(), false).build() }, + new Object[] { Settings.builder().put(WRITABLE_WARM_INDEX_SETTING.getKey(), true).build() } + ); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + ByteSizeValue cacheSize = new ByteSizeValue(16, ByteSizeUnit.GB); + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize.toString()) + .build(); + } + + @After + public void teardown() { + if (WRITABLE_WARM_INDEX_SETTING.get(settings)) { + assertAcked(client().admin().indices().prepareDelete("_all").get()); + var nodes = internalCluster().getDataNodeInstances(Node.class); + for (var node : nodes) { + var fileCache = node.fileCache(); + fileCache.clear(); + } + } + } + private static final String REMOTE_REPO_NAME = "remote-store-repo-name"; private void keepPinnedTimestampSchedulerUpdated() throws InterruptedException { @@ -55,8 +100,8 @@ public void testDeleteShallowCopyV2() throws Exception { final Path remoteStoreRepoPath = randomRepoPath(); internalCluster().startClusterManagerOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); - internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); - internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); + internalCluster().startDataAndWarmNodes(1, snapshotV2Settings(remoteStoreRepoPath)); + internalCluster().startDataAndWarmNodes(1, snapshotV2Settings(remoteStoreRepoPath)); String indexName1 = "testindex1"; String indexName2 = "testindex2"; @@ -139,7 +184,7 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.FIXED.toString()) .build(); String clusterManagerName = internalCluster().startClusterManagerOnlyNode(settings); - internalCluster().startDataOnlyNode(settings); + internalCluster().startDataAndWarmNodes(1, settings); final Client clusterManagerClient = internalCluster().clusterManagerClient(); ensureStableCluster(2); @@ -222,7 +267,7 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2MultipleSnapshots( .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.FIXED.toString()) .build(); String clusterManagerName = internalCluster().startClusterManagerOnlyNode(settings); - internalCluster().startDataOnlyNode(settings); + internalCluster().startDataAndWarmNodes(1, settings); final Client clusterManagerClient = internalCluster().clusterManagerClient(); ensureStableCluster(2); @@ -323,7 +368,7 @@ public void testRemoteStoreCleanupMultiplePrimaryOnSnapshotDeletion() throws Exc .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.FIXED.toString()) .build(); String clusterManagerName = internalCluster().startClusterManagerOnlyNode(settings); - internalCluster().startDataOnlyNodes(3, settings); + internalCluster().startDataAndWarmNodes(3, settings); final Client clusterManagerClient = internalCluster().clusterManagerClient(); ensureStableCluster(4); @@ -341,6 +386,7 @@ public void testRemoteStoreCleanupMultiplePrimaryOnSnapshotDeletion() throws Exc final String remoteStoreEnabledIndexName = "remote-index-1"; final Settings remoteStoreEnabledIndexSettings = Settings.builder() .put(getRemoteStoreBackedIndexSettings()) + .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(0)) .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 2) .build(); createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 52eabcfb9a486..e347072513e85 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -5285,7 +5285,7 @@ public void syncSegmentsFromGivenRemoteSegmentStore( } } assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty() - : "There should not be any segments file in the dir"; + || indexSettings.isWarmIndex() : "There should not be any segments file in the dir"; store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); } else if (segmentsNFile != null) { try ( diff --git a/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java b/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java index aabe553b4e3eb..0c8d2c560512c 100644 --- a/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.FilterDirectory; @@ -33,12 +34,12 @@ import java.nio.file.Path; import java.util.Arrays; import java.util.Collection; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.opensearch.index.store.remote.utils.FileTypeUtils.BLOCK_FILE_IDENTIFIER; import static org.apache.lucene.index.IndexFileNames.SEGMENTS; /** @@ -114,11 +115,31 @@ public String[] listAll() throws IOException { ensureOpen(); logger.trace("Composite Directory[{}]: listAll() called", this::toString); String[] localFiles = localDirectory.listAll(); - Set allFiles = new HashSet<>(Arrays.asList(localFiles)); - String[] remoteFiles = getRemoteFiles(); - allFiles.addAll(Arrays.asList(remoteFiles)); + String[] remoteFiles; + + // Check if local directory has any segments_n files + boolean hasLocalSegments = Arrays.stream(localFiles).anyMatch(fileName -> fileName.startsWith(IndexFileNames.SEGMENTS)); + + try { + if (hasLocalSegments) { + // If local has segments_n, filter out segments_n from remote + remoteFiles = Arrays.stream(remoteDirectory.listAll()) + .filter(fileName -> !fileName.startsWith(IndexFileNames.SEGMENTS)) + .toArray(String[]::new); + } else { + // If local doesn't have segments_n, include all remote files + remoteFiles = remoteDirectory.listAll(); + } + } catch (NullPointerException e) { + remoteFiles = new String[] {}; + } + logger.trace("Composite Directory[{}]: Local Directory files - {}", this::toString, () -> Arrays.toString(localFiles)); - logger.trace("Composite Directory[{}]: Remote Directory files - {}", this::toString, () -> Arrays.toString(remoteFiles)); + String[] finalRemoteFiles = remoteFiles; + logger.trace("Composite Directory[{}]: Remote Directory files - {}", this::toString, () -> Arrays.toString(finalRemoteFiles)); + Set allFiles = Stream.concat(Arrays.stream(localFiles), Arrays.stream(remoteFiles)) + .map(s -> s.contains(BLOCK_FILE_IDENTIFIER) ? s.substring(0, s.indexOf(BLOCK_FILE_IDENTIFIER)) : s) + .collect(Collectors.toSet()); Set nonBlockLuceneFiles = allFiles.stream() .filter(file -> !FileTypeUtils.isBlockFile(file)) .collect(Collectors.toUnmodifiableSet()); diff --git a/server/src/test/java/org/opensearch/index/store/CompositeDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/CompositeDirectoryTests.java index 43340be680a86..07c7a832c0624 100644 --- a/server/src/test/java/org/opensearch/index/store/CompositeDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/CompositeDirectoryTests.java @@ -53,6 +53,7 @@ public class CompositeDirectoryTests extends BaseRemoteSegmentStoreDirectoryTest private final static String NON_EXISTENT_FILE = "non_existent_file"; private final static String NEW_FILE = "new_file"; private final static String TEMP_FILE = "temp_file.tmp"; + private final static String LOCAL_SEGMENT_FILE = "segments_2"; private final static int FILE_CACHE_CAPACITY = 10000; @Before @@ -73,6 +74,13 @@ public void testListAll() throws IOException { assertArrayEquals(expectedFileNames, actualFileNames); } + public void testListAll_withLocalSegmentFiles() throws IOException { + addFilesToDirectory(new String[] { LOCAL_SEGMENT_FILE }); + String[] actualFileNames = compositeDirectory.listAll(); + String[] expectedFileNames = new String[] { "_0.cfe", "_0.cfs", "_0.si", "_1.cfe", "_2.cfe", "segments_2", "temp_file.tmp" }; + assertArrayEquals(expectedFileNames, actualFileNames); + } + public void testDeleteFile() throws IOException { assertTrue(existsInCompositeDirectory(FILE_PRESENT_LOCALLY)); assertTrue(existsInLocalDirectory(BLOCK_FILE_PRESENT_LOCALLY)); diff --git a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java index 02b5164e3d822..76113deb365b2 100644 --- a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -80,7 +80,7 @@ import org.opensearch.repositories.blobstore.BlobStoreTestUtil; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.snapshots.mockstore.MockRepository; -import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase; import org.opensearch.test.VersionUtils; import org.opensearch.threadpool.ThreadPool; import org.opensearch.threadpool.ThreadPoolStats; @@ -105,6 +105,7 @@ import java.util.function.Function; import java.util.function.Predicate; +import static org.opensearch.common.util.FeatureFlags.WRITABLE_WARM_INDEX_SETTING; import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS; import static org.opensearch.index.remote.RemoteStoreEnums.DataType.LOCK_FILES; import static org.hamcrest.Matchers.empty; @@ -113,7 +114,7 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; -public abstract class AbstractSnapshotIntegTestCase extends OpenSearchIntegTestCase { +public abstract class AbstractSnapshotIntegTestCase extends ParameterizedStaticSettingsOpenSearchIntegTestCase { protected final static String TEST_REMOTE_STORE_REPO_SUFFIX = "__rs"; private static final String OLD_VERSION_SNAPSHOT_PREFIX = "old-version-snapshot-"; @@ -125,6 +126,23 @@ public abstract class AbstractSnapshotIntegTestCase extends OpenSearchIntegTestC .put("thread_pool.snapshot.max", 5) .build(); + public AbstractSnapshotIntegTestCase(Settings nodeSettings) { + super(nodeSettings); + } + + public AbstractSnapshotIntegTestCase() { + super(Settings.EMPTY); + } + + /* + Disabling MockFSIndexStore plugin as the MockFSDirectoryFactory wraps the FSDirectory over a OpenSearchMockDirectoryWrapper which extends FilterDirectory (whereas FSDirectory extends BaseDirectory) + As a result of this wrapping the local directory of Composite Directory does not satisfy the assertion that local directory must be of type FSDirectory + */ + @Override + protected boolean addMockIndexStorePlugin() { + return WRITABLE_WARM_INDEX_SETTING.get(settings) == false; + } + @Override protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() @@ -522,14 +540,19 @@ protected void indexRandomDocs(String index, int numdocs) throws InterruptedExce } protected Settings getRemoteStoreBackedIndexSettings() { - return Settings.builder() + Settings.Builder settingsBuilder = Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "1") .put("index.refresh_interval", "300s") .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1") .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.FS.getSettingsKey()) .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .build(); + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT); + + if (WRITABLE_WARM_INDEX_SETTING.get(settings)) { + settingsBuilder.put(IndexModule.IS_WARM_INDEX_SETTING.getKey(), true); + } + + return settingsBuilder.build(); } protected Settings.Builder snapshotRepoSettingsForShallowCopy(Path path) { diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index c84fe8755419b..24a195e0c1b78 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -105,6 +105,7 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; +import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.indices.IndicesService; import org.opensearch.indices.breaker.HierarchyCircuitBreakerService; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; @@ -1769,7 +1770,12 @@ public synchronized void stopRandomNode(final Predicate filter) throws throw new AssertionError("Tried to stop the only cluster-manager eligible shared node"); } logger.info("Closing filtered random node [{}] ", nodeAndClient.name); + FileCache fileCache = nodeAndClient.node().fileCache(); stopNodesAndClient(nodeAndClient); + // Clear file cache to avoid file leaks during node stop + if (fileCache != null && WARM_NODE_PREDICATE.test(nodeAndClient)) { + fileCache.clear(); + } } }