Skip to content

Commit 60ff993

Browse files
committed
Enforce higher priority for RepositoriesService ClusterStateApplier
This avoids shards allocation failures when the repository instance comes in the same ClusterState update as the shard allocation. Backport of #58808
1 parent 49f4227 commit 60ff993

File tree

2 files changed

+121
-1
lines changed

2 files changed

+121
-1
lines changed

server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public RepositoriesService(Settings settings, ClusterService clusterService, Tra
8686
// Doesn't make sense to maintain repositories on non-master and non-data nodes
8787
// Nothing happens there anyway
8888
if (DiscoveryNode.isDataNode(settings) || DiscoveryNode.isMasterNode(settings)) {
89-
clusterService.addStateApplier(this);
89+
clusterService.addHighPriorityApplier(this);
9090
}
9191
this.verifyAction = new VerifyNodeRepositoryAction(transportService, clusterService, this);
9292
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.searchablesnapshots;
8+
9+
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
10+
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
11+
import org.elasticsearch.action.index.IndexRequestBuilder;
12+
import org.elasticsearch.cluster.metadata.IndexMetadata;
13+
import org.elasticsearch.cluster.routing.RoutingNode;
14+
import org.elasticsearch.cluster.routing.ShardRouting;
15+
import org.elasticsearch.cluster.routing.UnassignedInfo;
16+
import org.elasticsearch.common.Strings;
17+
import org.elasticsearch.common.settings.Settings;
18+
import org.elasticsearch.index.IndexSettings;
19+
import org.elasticsearch.snapshots.SnapshotInfo;
20+
import org.elasticsearch.test.ESIntegTestCase;
21+
import org.elasticsearch.test.InternalTestCluster;
22+
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction;
23+
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest;
24+
25+
import java.nio.file.Path;
26+
import java.util.ArrayList;
27+
import java.util.List;
28+
29+
import static org.elasticsearch.cluster.routing.UnassignedInfo.Reason.ALLOCATION_FAILED;
30+
import static org.elasticsearch.gateway.GatewayService.RECOVER_AFTER_DATA_NODES_SETTING;
31+
import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING;
32+
import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST;
33+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
34+
import static org.hamcrest.Matchers.equalTo;
35+
import static org.hamcrest.Matchers.greaterThan;
36+
import static org.hamcrest.Matchers.is;
37+
38+
@ESIntegTestCase.ClusterScope(scope = TEST, numDataNodes = 2)
39+
public class ClusterStateApplierOrderingTests extends BaseSearchableSnapshotsIntegTestCase {
40+
41+
public void testRepositoriesServiceClusterStateApplierIsCalledBeforeIndicesClusterStateService() throws Exception {
42+
final String fsRepoName = "fsrepo";
43+
final String indexName = "test-index";
44+
final String restoredIndexName = "restored-index";
45+
46+
final Path repo = randomRepoPath();
47+
assertAcked(
48+
client().admin().cluster().preparePutRepository(fsRepoName).setType("fs").setSettings(Settings.builder().put("location", repo))
49+
);
50+
51+
// Peer recovery always copies .liv files but we do not permit writing to searchable snapshot directories so this doesn't work, but
52+
// we can bypass this by forcing soft deletes to be used. TODO this restriction can be lifted when #55142 is resolved.
53+
assertAcked(prepareCreate(indexName, Settings.builder().put(INDEX_SOFT_DELETES_SETTING.getKey(), true)));
54+
55+
final List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
56+
for (int i = between(10, 10_000); i >= 0; i--) {
57+
indexRequestBuilders.add(client().prepareIndex(indexName, "_doc").setSource("foo", randomBoolean() ? "bar" : "baz"));
58+
}
59+
indexRandom(true, true, indexRequestBuilders);
60+
refresh(indexName);
61+
62+
CreateSnapshotResponse createSnapshotResponse = client().admin()
63+
.cluster()
64+
.prepareCreateSnapshot(fsRepoName, "snapshot")
65+
.setWaitForCompletion(true)
66+
.get();
67+
final SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo();
68+
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
69+
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
70+
71+
assertAcked(client().admin().indices().prepareDelete(indexName));
72+
73+
Settings.Builder indexSettingsBuilder = Settings.builder()
74+
.put(SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), false)
75+
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), Boolean.FALSE.toString())
76+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0);
77+
78+
final MountSearchableSnapshotRequest req = new MountSearchableSnapshotRequest(
79+
restoredIndexName,
80+
fsRepoName,
81+
snapshotInfo.snapshotId().getName(),
82+
indexName,
83+
indexSettingsBuilder.build(),
84+
Strings.EMPTY_ARRAY,
85+
true
86+
);
87+
88+
final RestoreSnapshotResponse restoreSnapshotResponse = client().execute(MountSearchableSnapshotAction.INSTANCE, req).get();
89+
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));
90+
ensureGreen(restoredIndexName);
91+
92+
// In order to reproduce this issue we need to force a full cluster restart so the new elected master
93+
// sends the entire ClusterState in one message, including assigned shards and repositories.
94+
internalCluster().fullRestart(new InternalTestCluster.RestartCallback() {
95+
@Override
96+
public Settings onNodeStopped(String nodeName) {
97+
// make sure state is not recovered until a third node joins
98+
return Settings.builder().put(RECOVER_AFTER_DATA_NODES_SETTING.getKey(), 3).build();
99+
}
100+
});
101+
102+
List<UnassignedInfo.Reason> unassignedReasons = new ArrayList<>();
103+
internalCluster().clusterService().addListener(event -> {
104+
if (event.routingTableChanged()) {
105+
for (RoutingNode routingNode : event.state().getRoutingNodes()) {
106+
for (ShardRouting shardRouting : routingNode) {
107+
if (shardRouting.unassignedInfo() != null) {
108+
unassignedReasons.add(shardRouting.unassignedInfo().getReason());
109+
}
110+
}
111+
}
112+
}
113+
});
114+
115+
internalCluster().ensureAtLeastNumDataNodes(3);
116+
117+
ensureGreen(restoredIndexName);
118+
assertThat("Unexpected shard allocation failure", unassignedReasons.stream().noneMatch(r -> r == ALLOCATION_FAILED), is(true));
119+
}
120+
}

0 commit comments

Comments
 (0)