Skip to content

Commit 3d23cb4

Browse files
Speed up Snapshot Finalization (#47283) (#47309)
As a result of #45689 snapshot finalization started to take significantly longer than before. This may be a little unfortunate since it increases the likelihood of failing to finalize after having written out all the segment blobs. This change parallelizes all the metadata writes that can safely run in parallel in the finalization step to speed the finalization step up again. Also, this will generally speed up the snapshot process overall in case of large number of indices. This is also a nice to have for #46250 since we add yet another step (deleting of old index- blobs in the shards to the finalization.
1 parent bd2abee commit 3d23cb4

File tree

10 files changed

+122
-115
lines changed

10 files changed

+122
-115
lines changed

server/src/main/java/org/elasticsearch/repositories/FilterRepository.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,11 @@ public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, Met
7979
}
8080

8181
@Override
82-
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
83-
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
84-
MetaData metaData, Map<String, Object> userMetadata) {
85-
return in.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId,
86-
includeGlobalState, metaData, userMetadata);
82+
public void finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
83+
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
84+
MetaData metaData, Map<String, Object> userMetadata, ActionListener<SnapshotInfo> listener) {
85+
in.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId,
86+
includeGlobalState, metaData, userMetadata, listener);
8787
}
8888

8989
@Override

server/src/main/java/org/elasticsearch/repositories/Repository.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,11 +135,11 @@ default Repository create(RepositoryMetaData metaData, Function<String, Reposito
135135
* @param shardFailures list of shard failures
136136
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot began
137137
* @param includeGlobalState include cluster global state
138-
* @return snapshot description
138+
* @param listener listener to be called on completion of the snapshot
139139
*/
140-
SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
141-
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
142-
MetaData clusterMetaData, Map<String, Object> userMetadata);
140+
void finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
141+
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
142+
MetaData clusterMetaData, Map<String, Object> userMetadata, ActionListener<SnapshotInfo> listener);
143143

144144
/**
145145
* Deletes snapshot

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 47 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,6 @@
9797
import java.io.FilterInputStream;
9898
import java.io.IOException;
9999
import java.io.InputStream;
100-
import java.nio.file.FileAlreadyExistsException;
101100
import java.nio.file.NoSuchFileException;
102101
import java.util.ArrayList;
103102
import java.util.Arrays;
@@ -665,53 +664,60 @@ private void deleteIndexMetaDataBlobIgnoringErrors(SnapshotId snapshotId, IndexI
665664
}
666665

667666
@Override
668-
public SnapshotInfo finalizeSnapshot(final SnapshotId snapshotId,
669-
final List<IndexId> indices,
670-
final long startTime,
671-
final String failure,
672-
final int totalShards,
673-
final List<SnapshotShardFailure> shardFailures,
674-
final long repositoryStateId,
675-
final boolean includeGlobalState,
676-
final MetaData clusterMetaData,
677-
final Map<String, Object> userMetadata) {
678-
SnapshotInfo blobStoreSnapshot = new SnapshotInfo(snapshotId,
679-
indices.stream().map(IndexId::getName).collect(Collectors.toList()),
680-
startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures,
681-
includeGlobalState, userMetadata);
667+
public void finalizeSnapshot(final SnapshotId snapshotId,
668+
final List<IndexId> indices,
669+
final long startTime,
670+
final String failure,
671+
final int totalShards,
672+
final List<SnapshotShardFailure> shardFailures,
673+
final long repositoryStateId,
674+
final boolean includeGlobalState,
675+
final MetaData clusterMetaData,
676+
final Map<String, Object> userMetadata,
677+
final ActionListener<SnapshotInfo> listener) {
678+
679+
// Once we're done writing all metadata, we update the index-N blob to finalize the snapshot
680+
final ActionListener<SnapshotInfo> afterMetaWrites = ActionListener.wrap(snapshotInfo -> {
681+
writeIndexGen(getRepositoryData().addSnapshot(snapshotId, snapshotInfo.state(), indices), repositoryStateId);
682+
listener.onResponse(snapshotInfo);
683+
}, ex -> listener.onFailure(new SnapshotException(metadata.name(), snapshotId, "failed to update snapshot in repository", ex)));
684+
685+
// We upload one meta blob for each index, one for the cluster-state and one snap-${uuid}.dat blob
686+
final GroupedActionListener<SnapshotInfo> allMetaListener =
687+
new GroupedActionListener<>(ActionListener.map(afterMetaWrites, snapshotInfos -> {
688+
assert snapshotInfos.size() == 1 : "Should have only received a single SnapshotInfo but received " + snapshotInfos;
689+
return snapshotInfos.iterator().next();
690+
}), 2 + indices.size());
691+
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
682692

683-
try {
684-
// We ignore all FileAlreadyExistsException here since otherwise a master failover while in this method will
685-
// mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of the
686-
// index or global metadata will be compatible with the segments written in this snapshot as well.
687-
// Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way that
688-
// decrements the generation it points at
693+
// We ignore all FileAlreadyExistsException when writing metadata since otherwise a master failover while in this method will
694+
// mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of the
695+
// index or global metadata will be compatible with the segments written in this snapshot as well.
696+
// Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way
697+
// that decrements the generation it points at
689698

690-
// Write Global MetaData
699+
// Write Global MetaData
700+
executor.execute(ActionRunnable.wrap(allMetaListener, l -> {
691701
globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID(), false);
702+
l.onResponse(null);
703+
}));
692704

693-
// write the index metadata for each index in the snapshot
694-
for (IndexId index : indices) {
705+
// write the index metadata for each index in the snapshot
706+
for (IndexId index : indices) {
707+
executor.execute(ActionRunnable.wrap(allMetaListener, l -> {
695708
indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID(), false);
696-
}
697-
} catch (IOException ex) {
698-
throw new SnapshotException(metadata.name(), snapshotId, "failed to write metadata for snapshot", ex);
709+
l.onResponse(null);
710+
}));
699711
}
700712

701-
try {
702-
final RepositoryData updatedRepositoryData = getRepositoryData().addSnapshot(snapshotId, blobStoreSnapshot.state(), indices);
703-
snapshotFormat.write(blobStoreSnapshot, blobContainer(), snapshotId.getUUID(), false);
704-
writeIndexGen(updatedRepositoryData, repositoryStateId);
705-
} catch (FileAlreadyExistsException ex) {
706-
// if another master was elected and took over finalizing the snapshot, it is possible
707-
// that both nodes try to finalize the snapshot and write to the same blobs, so we just
708-
// log a warning here and carry on
709-
throw new RepositoryException(metadata.name(), "Blob already exists while " +
710-
"finalizing snapshot, assume the snapshot has already been saved", ex);
711-
} catch (IOException ex) {
712-
throw new RepositoryException(metadata.name(), "failed to update snapshot in repository", ex);
713-
}
714-
return blobStoreSnapshot;
713+
executor.execute(ActionRunnable.wrap(afterMetaWrites, afterMetaListener -> {
714+
final SnapshotInfo snapshotInfo = new SnapshotInfo(snapshotId,
715+
indices.stream().map(IndexId::getName).collect(Collectors.toList()),
716+
startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures,
717+
includeGlobalState, userMetadata);
718+
snapshotFormat.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), false);
719+
afterMetaListener.onResponse(snapshotInfo);
720+
}));
715721
}
716722

717723
@Override

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -561,25 +561,25 @@ public void onNoLongerMaster() {
561561
private void cleanupAfterError(Exception exception) {
562562
threadPool.generic().execute(() -> {
563563
if (snapshotCreated) {
564-
try {
565-
repositoriesService.repository(snapshot.snapshot().getRepository())
566-
.finalizeSnapshot(snapshot.snapshot().getSnapshotId(),
567-
snapshot.indices(),
568-
snapshot.startTime(),
569-
ExceptionsHelper.detailedMessage(exception),
570-
0,
571-
Collections.emptyList(),
572-
snapshot.getRepositoryStateId(),
573-
snapshot.includeGlobalState(),
574-
metaDataForSnapshot(snapshot, clusterService.state().metaData()),
575-
snapshot.userMetadata());
576-
} catch (Exception inner) {
577-
inner.addSuppressed(exception);
578-
logger.warn(() -> new ParameterizedMessage("[{}] failed to close snapshot in repository",
579-
snapshot.snapshot()), inner);
580-
}
564+
repositoriesService.repository(snapshot.snapshot().getRepository())
565+
.finalizeSnapshot(snapshot.snapshot().getSnapshotId(),
566+
snapshot.indices(),
567+
snapshot.startTime(),
568+
ExceptionsHelper.stackTrace(exception),
569+
0,
570+
Collections.emptyList(),
571+
snapshot.getRepositoryStateId(),
572+
snapshot.includeGlobalState(),
573+
metaDataForSnapshot(snapshot, clusterService.state().metaData()),
574+
snapshot.userMetadata(), ActionListener.runAfter(ActionListener.wrap(ignored -> {
575+
}, inner -> {
576+
inner.addSuppressed(exception);
577+
logger.warn(() -> new ParameterizedMessage("[{}] failed to finalize snapshot in repository",
578+
snapshot.snapshot()), inner);
579+
}), () -> userCreateSnapshotListener.onFailure(e)));
580+
} else {
581+
userCreateSnapshotListener.onFailure(e);
581582
}
582-
userCreateSnapshotListener.onFailure(e);
583583
});
584584
}
585585
}
@@ -1007,7 +1007,7 @@ protected void doRun() {
10071007
shardFailures.add(new SnapshotShardFailure(status.nodeId(), shardId, status.reason()));
10081008
}
10091009
}
1010-
SnapshotInfo snapshotInfo = repository.finalizeSnapshot(
1010+
repository.finalizeSnapshot(
10111011
snapshot.getSnapshotId(),
10121012
entry.indices(),
10131013
entry.startTime(),
@@ -1017,9 +1017,10 @@ protected void doRun() {
10171017
entry.getRepositoryStateId(),
10181018
entry.includeGlobalState(),
10191019
metaDataForSnapshot(entry, metaData),
1020-
entry.userMetadata());
1021-
removeSnapshotFromClusterState(snapshot, snapshotInfo, null);
1022-
logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state());
1020+
entry.userMetadata(), ActionListener.wrap(snapshotInfo -> {
1021+
removeSnapshotFromClusterState(snapshot, snapshotInfo, null);
1022+
logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state());
1023+
}, this::onFailure));
10231024
}
10241025

10251026
@Override

server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -159,10 +159,11 @@ public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, Met
159159
}
160160

161161
@Override
162-
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure,
163-
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
164-
boolean includeGlobalState, MetaData metaData, Map<String, Object> userMetadata) {
165-
return null;
162+
public void finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure,
163+
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
164+
boolean includeGlobalState, MetaData metaData, Map<String, Object> userMetadata,
165+
ActionListener<SnapshotInfo> listener) {
166+
listener.onResponse(null);
166167
}
167168

168169
@Override

server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java

Lines changed: 20 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,48 +18,33 @@
1818
*/
1919
package org.elasticsearch.snapshots.mockstore;
2020

21+
import org.apache.lucene.util.SameThreadExecutorService;
22+
import org.elasticsearch.action.support.PlainActionFuture;
2123
import org.elasticsearch.cluster.metadata.MetaData;
2224
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
2325
import org.elasticsearch.common.UUIDs;
2426
import org.elasticsearch.common.blobstore.BlobContainer;
2527
import org.elasticsearch.common.settings.Settings;
26-
import org.elasticsearch.env.Environment;
27-
import org.elasticsearch.env.TestEnvironment;
2828
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
2929
import org.elasticsearch.snapshots.SnapshotId;
30+
import org.elasticsearch.snapshots.SnapshotInfo;
3031
import org.elasticsearch.test.ESTestCase;
3132
import org.elasticsearch.threadpool.ThreadPool;
3233

3334
import java.io.ByteArrayInputStream;
3435
import java.io.IOException;
3536
import java.io.InputStream;
3637
import java.nio.file.NoSuchFileException;
37-
import java.nio.file.Path;
3838
import java.util.Arrays;
3939
import java.util.Collections;
4040

41-
import static org.elasticsearch.env.Environment.PATH_HOME_SETTING;
42-
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
4341
import static org.hamcrest.Matchers.equalTo;
4442
import static org.hamcrest.Matchers.startsWith;
4543
import static org.mockito.Mockito.mock;
44+
import static org.mockito.Mockito.when;
4645

4746
public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
4847

49-
private Environment environment;
50-
51-
@Override
52-
public void setUp() throws Exception {
53-
super.setUp();
54-
final Path tempDir = createTempDir();
55-
final String nodeName = "testNode";
56-
environment = TestEnvironment.newEnvironment(Settings.builder()
57-
.put(NODE_NAME_SETTING.getKey(), nodeName)
58-
.put(PATH_HOME_SETTING.getKey(), tempDir.resolve(nodeName).toAbsolutePath())
59-
.put(Environment.PATH_REPO_SETTING.getKey(), tempDir.resolve("repo").toAbsolutePath())
60-
.build());
61-
}
62-
6348
public void testReadAfterWriteConsistently() throws IOException {
6449
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
6550
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
@@ -151,27 +136,37 @@ public void testOverwriteShardSnapBlobFails() throws IOException {
151136

152137
public void testOverwriteSnapshotInfoBlob() {
153138
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
139+
final ThreadPool threadPool = mock(ThreadPool.class);
140+
when(threadPool.executor(ThreadPool.Names.SNAPSHOT)).thenReturn(new SameThreadExecutorService());
154141
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
155142
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
156-
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
143+
xContentRegistry(), threadPool, blobStoreContext)) {
157144
repository.start();
158145

159146
// We create a snap- blob for snapshot "foo" in the first generation
147+
final PlainActionFuture<SnapshotInfo> future = PlainActionFuture.newFuture();
160148
final SnapshotId snapshotId = new SnapshotId("foo", UUIDs.randomBase64UUID());
161149
repository.finalizeSnapshot(snapshotId, Collections.emptyList(), 1L, null, 5, Collections.emptyList(),
162-
-1L, false, MetaData.EMPTY_META_DATA, Collections.emptyMap());
150+
-1L, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(), future);
151+
future.actionGet();
163152

164153
// We try to write another snap- blob for "foo" in the next generation. It fails because the content differs.
165154
final AssertionError assertionError = expectThrows(AssertionError.class,
166-
() -> repository.finalizeSnapshot(
167-
snapshotId, Collections.emptyList(), 1L, null, 6, Collections.emptyList(),
168-
0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap()));
155+
() -> {
156+
final PlainActionFuture<SnapshotInfo> fut = PlainActionFuture.newFuture();
157+
repository.finalizeSnapshot(
158+
snapshotId, Collections.emptyList(), 1L, null, 6, Collections.emptyList(),
159+
0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(), fut);
160+
fut.actionGet();
161+
});
169162
assertThat(assertionError.getMessage(), equalTo("\nExpected: <6>\n but: was <5>"));
170163

171164
// We try to write yet another snap- blob for "foo" in the next generation.
172165
// It passes cleanly because the content of the blob except for the timestamps.
166+
final PlainActionFuture<SnapshotInfo> future2 = PlainActionFuture.newFuture();
173167
repository.finalizeSnapshot(snapshotId, Collections.emptyList(), 1L, null, 5, Collections.emptyList(),
174-
0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap());
168+
0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(), future2);
169+
future2.actionGet();
175170
}
176171
}
177172

0 commit comments

Comments
 (0)