Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Collection;

Expand Down Expand Up @@ -145,6 +149,9 @@ public void testSimpleWorkflow() {
ClusterState clusterState = client.admin().cluster().prepareState().get().getState();
assertThat(clusterState.getMetaData().hasIndex("test-idx-1"), equalTo(true));
assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false));
final BlobStoreRepository repo =
(BlobStoreRepository) getInstanceFromNode(RepositoriesService.class).repository("test-repo");
BlobStoreTestUtil.assertConsistency(repo, repo.threadPool().executor(ThreadPool.Names.GENERIC));
}

public void testMissingUri() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.test.StreamsUtils;

import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.Executor;
import java.util.Map;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -76,6 +78,20 @@ protected void createRepository(String repoName) {
}

@Override
protected boolean assertCorruptionVisible(BlobStoreRepository repo, Executor genericExec) throws Exception {
// S3 is only eventually consistent for the list operations used by this assertions so we retry for 10 minutes assuming that
// listing operations will become consistent within these 10 minutes.
assertBusy(() -> assertTrue(super.assertCorruptionVisible(repo, genericExec)), 10L, TimeUnit.MINUTES);
return true;
}

@Override
protected void assertConsistentRepository(BlobStoreRepository repo, Executor executor) throws Exception {
// S3 is only eventually consistent for the list operations used by this assertions so we retry for 10 minutes assuming that
// listing operations will become consistent within these 10 minutes.
assertBusy(() -> super.assertConsistentRepository(repo, executor), 10L, TimeUnit.MINUTES);
}

protected void assertBlobsByPrefix(BlobPath path, String prefix, Map<String, BlobMetaData> blobs) throws Exception {
// AWS S3 is eventually consistent so we retry for 10 minutes assuming a list operation will never take longer than that
// to become consistent.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
Expand Down Expand Up @@ -419,46 +418,68 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Action
logger.warn(() -> new ParameterizedMessage("cannot read snapshot file [{}]", snapshotId), ex);
}
// Delete snapshot from the index file, since it is the maintainer of truth of active snapshots
final RepositoryData repositoryData;
final RepositoryData updatedRepositoryData;
final Map<String, BlobContainer> foundIndices;
try {
repositoryData = getRepositoryData();
final RepositoryData repositoryData = getRepositoryData();
updatedRepositoryData = repositoryData.removeSnapshot(snapshotId);
// Cache the indices that were found before writing out the new index-N blob so that a stuck master will never
// delete an index that was created by another master node after writing this index-N blob.
foundIndices = blobStore().blobContainer(basePath().add("indices")).children();
writeIndexGen(updatedRepositoryData, repositoryStateId);
} catch (Exception ex) {
listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex));
return;
}
final SnapshotInfo finalSnapshotInfo = snapshot;
final Collection<IndexId> unreferencedIndices = Sets.newHashSet(repositoryData.getIndices().values());
unreferencedIndices.removeAll(updatedRepositoryData.getIndices().values());
try {
blobContainer().deleteBlobsIgnoringIfNotExists(
Arrays.asList(snapshotFormat.blobName(snapshotId.getUUID()), globalMetaDataFormat.blobName(snapshotId.getUUID())));
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}] Unable to delete global metadata files", snapshotId), e);
}
final Map<String, IndexId> survivingIndices = updatedRepositoryData.getIndices();
deleteIndices(
Optional.ofNullable(finalSnapshotInfo)
.map(info -> info.indices().stream().map(repositoryData::resolveIndexId).collect(Collectors.toList()))
.map(info -> info.indices().stream().filter(survivingIndices::containsKey)
.map(updatedRepositoryData::resolveIndexId).collect(Collectors.toList()))
.orElse(Collections.emptyList()),
snapshotId,
ActionListener.map(listener, v -> {
try {
blobStore().blobContainer(indicesPath()).deleteBlobsIgnoringIfNotExists(
unreferencedIndices.stream().map(IndexId::getId).collect(Collectors.toList()));
} catch (IOException e) {
logger.warn(() ->
new ParameterizedMessage(
"[{}] indices {} are no longer part of any snapshots in the repository, " +
"but failed to clean up their index folders.", metadata.name(), unreferencedIndices), e);
}
cleanupStaleIndices(foundIndices, survivingIndices);
return null;
})
);
}
}

private void cleanupStaleIndices(Map<String, BlobContainer> foundIndices, Map<String, IndexId> survivingIndices) {
try {
final Set<String> survivingIndexIds = survivingIndices.values().stream()
.map(IndexId::getId).collect(Collectors.toSet());
for (Map.Entry<String, BlobContainer> indexEntry : foundIndices.entrySet()) {
final String indexSnId = indexEntry.getKey();
try {
if (survivingIndexIds.contains(indexSnId) == false) {
logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId);
indexEntry.getValue().delete();
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
}
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage(
"[{}] index {} is no longer part of any snapshots in the repository, " +
"but failed to clean up their index folders", metadata.name(), indexSnId), e);
}
}
} catch (Exception e) {
// TODO: We shouldn't be blanket catching and suppressing all exceptions here and instead handle them safely upstream.
// Currently this catch exists as a stop gap solution to tackle unexpected runtime exceptions from implementations
// bubbling up and breaking the snapshot functionality.
assert false : e;
logger.warn(new ParameterizedMessage("[{}] Exception during cleanup of stale indices", metadata.name()), e);
}
}

private void deleteIndices(List<IndexId> indices, SnapshotId snapshotId, ActionListener<Void> listener) {
if (indices.isEmpty()) {
listener.onResponse(null);
Expand Down Expand Up @@ -523,9 +544,9 @@ public SnapshotInfo finalizeSnapshot(final SnapshotId snapshotId,
startTime, failure, System.currentTimeMillis(), totalShards, shardFailures,
includeGlobalState, userMetadata);
try {
final RepositoryData updatedRepositoryData = getRepositoryData().addSnapshot(snapshotId, blobStoreSnapshot.state(), indices);
snapshotFormat.write(blobStoreSnapshot, blobContainer(), snapshotId.getUUID());
final RepositoryData repositoryData = getRepositoryData();
writeIndexGen(repositoryData.addSnapshot(snapshotId, blobStoreSnapshot.state(), indices), repositoryStateId);
writeIndexGen(updatedRepositoryData, repositoryStateId);
} catch (FileAlreadyExistsException ex) {
// if another master was elected and took over finalizing the snapshot, it is possible
// that both nodes try to finalize the snapshot and write to the same blobs, so we just
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@

import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.After;
Expand Down Expand Up @@ -65,6 +67,32 @@ public void assertConsistentHistoryInLuceneIndex() throws Exception {
internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex();
}

private String skipRepoConsistencyCheckReason;

@After
public void assertRepoConsistency() {
if (skipRepoConsistencyCheckReason == null) {
client().admin().cluster().prepareGetRepositories().get().repositories()
.stream()
.map(RepositoryMetaData::name)
.forEach(name -> {
final List<SnapshotInfo> snapshots = client().admin().cluster().prepareGetSnapshots(name).get().getSnapshots();
// Delete one random snapshot to trigger repository cleanup.
if (snapshots.isEmpty() == false) {
client().admin().cluster().prepareDeleteSnapshot(name, randomFrom(snapshots).snapshotId().getName()).get();
}
BlobStoreTestUtil.assertRepoConsistency(internalCluster(), name);
});
} else {
logger.info("--> skipped repo consistency checks because [{}]", skipRepoConsistencyCheckReason);
}
}

protected void disableRepoConsistencyCheck(String reason) {
assertNotNull(reason);
skipRepoConsistencyCheckReason = reason;
}

public static long getFailureCount(String repository) {
long failureCount = 0;
for (RepositoriesService repositoriesService :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,7 @@ public boolean clearData(String nodeName) {
}

public void testRegistrationFailure() {
disableRepoConsistencyCheck("This test does not create any data in the repository");
logger.info("--> start first node");
internalCluster().startNode();
logger.info("--> start second node");
Expand All @@ -742,6 +743,7 @@ public void testRegistrationFailure() {
}

public void testThatSensitiveRepositorySettingsAreNotExposed() throws Exception {
disableRepoConsistencyCheck("This test does not create any data in the repository");
Settings nodeSettings = Settings.EMPTY;
logger.info("--> start two nodes");
internalCluster().startNodes(2, nodeSettings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ public void testWhenMetadataAreLoaded() throws Exception {
// Deleting a snapshot does not load the global metadata state but loads each index metadata
assertAcked(client().admin().cluster().prepareDeleteSnapshot("repository", "snap").get());
assertGlobalMetadataLoads("snap", 1);
assertIndexMetadataLoads("snap", "docs", 5);
assertIndexMetadataLoads("snap", "others", 4);
assertIndexMetadataLoads("snap", "docs", 4);
assertIndexMetadataLoads("snap", "others", 3);
}

private void assertGlobalMetadataLoads(final String snapshot, final int times) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ public void testRepositoryAckTimeout() throws Exception {
}

public void testRepositoryVerification() throws Exception {
disableRepoConsistencyCheck("This test does not create any data in the repository.");

Client client = client();

Settings settings = Settings.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ public void testSingleGetAfterRestore() throws Exception {
assertThat(client.prepareGet(restoredIndexName, typeName, docId).get().isExists(), equalTo(true));
}

public void testFreshIndexUUID() {
public void testFreshIndexUUID() throws InterruptedException {
Client client = client();

logger.info("--> creating repository");
Expand Down Expand Up @@ -541,7 +541,6 @@ public void testRestoreAliases() throws Exception {
logger.info("--> check that aliases are not restored and existing aliases still exist");
assertAliasesMissing(client.admin().indices().prepareAliasesExist("alias-123", "alias-1").get());
assertAliasesExist(client.admin().indices().prepareAliasesExist("alias-3").get());

}

public void testRestoreTemplates() throws Exception {
Expand Down Expand Up @@ -594,7 +593,6 @@ public void testRestoreTemplates() throws Exception {
logger.info("--> check that template is restored");
getIndexTemplatesResponse = client().admin().indices().prepareGetTemplates().get();
assertIndexTemplateExists(getIndexTemplatesResponse, "test-template");

}

public void testIncludeGlobalState() throws Exception {
Expand Down Expand Up @@ -781,10 +779,10 @@ public void testIncludeGlobalState() throws Exception {
assertFalse(client().admin().cluster().prepareGetPipeline("barbaz").get().isFound());
assertNull(client().admin().cluster().prepareGetStoredScript("foobar").get().getSource());
assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().getTotalHits().value, equalTo(100L));

}

public void testSnapshotFileFailureDuringSnapshot() {
public void testSnapshotFileFailureDuringSnapshot() throws InterruptedException {
disableRepoConsistencyCheck("This test uses a purposely broken repository so it would fail consistency checks");
Client client = client();

logger.info("--> creating repository");
Expand Down Expand Up @@ -911,6 +909,8 @@ public void testDataFileFailureDuringSnapshot() throws Exception {
}

public void testDataFileFailureDuringRestore() throws Exception {
disableRepoConsistencyCheck("This test intentionally leaves a broken repository");

Path repositoryLocation = randomRepoPath();
Client client = client();
logger.info("--> creating repository");
Expand Down Expand Up @@ -974,6 +974,8 @@ public void testDataFileFailureDuringRestore() throws Exception {
}

public void testDataFileCorruptionDuringRestore() throws Exception {
disableRepoConsistencyCheck("This test intentionally leaves a broken repository");

Path repositoryLocation = randomRepoPath();
Client client = client();
logger.info("--> creating repository");
Expand Down Expand Up @@ -1238,7 +1240,6 @@ public void testDeletionOfFailingToRecoverIndexShouldStopRestore() throws Except
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));
SearchResponse countResponse = client.prepareSearch("test-idx").setSize(0).get();
assertThat(countResponse.getHits().getTotalHits().value, equalTo(100L));

}

public void testUnallocatedShards() throws Exception {
Expand Down Expand Up @@ -1703,8 +1704,6 @@ public void testRenameOnRestore() throws Exception {
.setIndices("test-idx-1").setRenamePattern("test-idx").setRenameReplacement("alias")
.setWaitForCompletion(true).setIncludeAliases(false).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));


}

public void testMoveShardWhileSnapshotting() throws Exception {
Expand Down Expand Up @@ -1771,6 +1770,7 @@ public void testMoveShardWhileSnapshotting() throws Exception {
}

public void testDeleteRepositoryWhileSnapshotting() throws Exception {
disableRepoConsistencyCheck("This test uses a purposely broken repository so it would fail consistency checks");
Client client = client();
Path repositoryLocation = randomRepoPath();
logger.info("--> creating repository");
Expand Down Expand Up @@ -2329,7 +2329,6 @@ public void testChangeSettingsOnRestore() throws Exception {

assertHitCount(client.prepareSearch("test-idx").setSize(0).setQuery(matchQuery("field1", "Foo")).get(), numdocs);
assertHitCount(client.prepareSearch("test-idx").setSize(0).setQuery(matchQuery("field1", "bar")).get(), numdocs);

}

public void testRecreateBlocksOnRestore() throws Exception {
Expand Down Expand Up @@ -2423,6 +2422,8 @@ public void testRecreateBlocksOnRestore() throws Exception {
}

public void testCloseOrDeleteIndexDuringSnapshot() throws Exception {
disableRepoConsistencyCheck("This test intentionally leaves a broken repository");

Client client = client();

boolean allowPartial = randomBoolean();
Expand Down Expand Up @@ -2747,6 +2748,8 @@ private boolean waitForRelocationsToStart(final String index, TimeValue timeout)
}

public void testSnapshotName() throws Exception {
disableRepoConsistencyCheck("This test does not create any data in the repository");

final Client client = client();

logger.info("--> creating repository");
Expand All @@ -2767,6 +2770,8 @@ public void testSnapshotName() throws Exception {
}

public void testListCorruptedSnapshot() throws Exception {
disableRepoConsistencyCheck("This test intentionally leaves a broken repository");

Client client = client();
Path repo = randomRepoPath();
logger.info("--> creating repository at {}", repo.toAbsolutePath());
Expand Down Expand Up @@ -3336,6 +3341,9 @@ public void testSnapshotCanceledOnRemovedShard() throws Exception {
}

public void testSnapshotSucceedsAfterSnapshotFailure() throws Exception {
// TODO: Fix repo cleanup logic to handle these leaked snap-file and only exclude test-repo (the mock repo) here.
disableRepoConsistencyCheck(
"This test uses a purposely broken repository implementation that results in leaking snap-{uuid}.dat files");
logger.info("--> creating repository");
final Path repoPath = randomRepoPath();
final Client client = client();
Expand Down
Loading