Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public RepositoriesService(Settings settings, ClusterService clusterService, Tra
// Doesn't make sense to maintain repositories on non-master and non-data nodes
// Nothing happens there anyway
if (DiscoveryNode.isDataNode(settings) || DiscoveryNode.isMasterNode(settings)) {
clusterService.addStateApplier(this);
clusterService.addHighPriorityApplier(this);
}
this.verifyAction = new VerifyNodeRepositoryAction(transportService, clusterService, this);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.searchablesnapshots;

import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;

import static org.elasticsearch.cluster.routing.UnassignedInfo.Reason.ALLOCATION_FAILED;
import static org.elasticsearch.gateway.GatewayService.RECOVER_AFTER_DATA_NODES_SETTING;
import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING;
import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;

@ESIntegTestCase.ClusterScope(scope = TEST, numDataNodes = 0, autoManageMasterNodes = false)
public class ClusterStateApplierOrderingTests extends BaseSearchableSnapshotsIntegTestCase {

public void testRepositoriesServiceClusterStateApplierIsCalledBeforeIndicesClusterStateService() throws Exception {
internalCluster().setBootstrapMasterNodeIndex(0);
internalCluster().startNodes(2);
final String fsRepoName = "fsrepo";
final String indexName = "test-index";
final String restoredIndexName = "restored-index";

final Path repo = randomRepoPath();
assertAcked(
client().admin().cluster().preparePutRepository(fsRepoName).setType("fs").setSettings(Settings.builder().put("location", repo))
);

// Peer recovery always copies .liv files but we do not permit writing to searchable snapshot directories so this doesn't work, but
// we can bypass this by forcing soft deletes to be used. TODO this restriction can be lifted when #55142 is resolved.
assertAcked(prepareCreate(indexName, Settings.builder().put(INDEX_SOFT_DELETES_SETTING.getKey(), true)));

final List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
for (int i = between(10, 10_000); i >= 0; i--) {
indexRequestBuilders.add(client().prepareIndex(indexName, "_doc").setSource("foo", randomBoolean() ? "bar" : "baz"));
}
indexRandom(true, true, indexRequestBuilders);
refresh(indexName);

CreateSnapshotResponse createSnapshotResponse = client().admin()
.cluster()
.prepareCreateSnapshot(fsRepoName, "snapshot")
.setWaitForCompletion(true)
.get();
final SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo();
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));

assertAcked(client().admin().indices().prepareDelete(indexName));

Settings.Builder indexSettingsBuilder = Settings.builder()
.put(SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), Boolean.FALSE.toString())
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0);

final MountSearchableSnapshotRequest req = new MountSearchableSnapshotRequest(
restoredIndexName,
fsRepoName,
snapshotInfo.snapshotId().getName(),
indexName,
indexSettingsBuilder.build(),
Strings.EMPTY_ARRAY,
true
);

final RestoreSnapshotResponse restoreSnapshotResponse = client().execute(MountSearchableSnapshotAction.INSTANCE, req).get();
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));
ensureGreen(restoredIndexName);

// In order to reproduce this issue we need to force a full cluster restart so the new elected master
// sends the entire ClusterState in one message, including assigned shards and repositories.
internalCluster().fullRestart(new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) {
// make sure state is not recovered until a third node joins
return Settings.builder().put(RECOVER_AFTER_DATA_NODES_SETTING.getKey(), 3).build();
}
});

List<UnassignedInfo.Reason> unassignedReasons = new ArrayList<>();
internalCluster().clusterService().addListener(event -> {
if (event.routingTableChanged()) {
for (RoutingNode routingNode : event.state().getRoutingNodes()) {
for (ShardRouting shardRouting : routingNode) {
if (shardRouting.unassignedInfo() != null) {
unassignedReasons.add(shardRouting.unassignedInfo().getReason());
}
}
}
}
});

internalCluster().ensureAtLeastNumDataNodes(3);

ensureGreen(restoredIndexName);
assertThat("Unexpected shard allocation failure", unassignedReasons.stream().noneMatch(r -> r == ALLOCATION_FAILED), is(true));
}
}