Skip to content

Commit 504a89f

Browse files
authored
Step down as master when configured out of voting configuration (#37802)
Abdicates to another master-eligible node once the active master is reconfigured out of the voting configuration, for example through the use of voting configuration exclusions. Follow-up to #37712
1 parent 827c4f6 commit 504a89f

File tree

9 files changed

+170
-26
lines changed

9 files changed

+170
-26
lines changed

docs/reference/modules/discovery/adding-removing-nodes.asciidoc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,10 @@ The node that should be added to the exclusions list is specified using
7272
<<cluster-nodes,node filters>> in place of `node_name` here. If a call to the
7373
voting configuration exclusions API fails, you can safely retry it. Only a
7474
successful response guarantees that the node has actually been removed from the
75-
voting configuration and will not be reinstated.
75+
voting configuration and will not be reinstated. If it's the active master that
76+
was removed from the voting configuration, then it will abdicate to another
77+
master-eligible node that's still in the voting configuration, if such a node
78+
is available.
7679

7780
Although the voting configuration exclusions API is most useful for down-scaling
7881
a two-node to a one-node cluster, it is also possible to use it to remove

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
112112

113113
private final PeerFinder peerFinder;
114114
private final PreVoteCollector preVoteCollector;
115+
private final Random random;
115116
private final ElectionSchedulerFactory electionSchedulerFactory;
116117
private final UnicastConfiguredHostsResolver configuredHostsResolver;
117118
private final TimeValue publishTimeout;
@@ -153,6 +154,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
153154
this.lastJoin = Optional.empty();
154155
this.joinAccumulator = new InitialJoinAccumulator();
155156
this.publishTimeout = PUBLISH_TIMEOUT_SETTING.get(settings);
157+
this.random = random;
156158
this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool());
157159
this.preVoteCollector = new PreVoteCollector(transportService, this::startElection, this::updateMaxTermSeen);
158160
configuredHostsResolver = new UnicastConfiguredHostsResolver(nodeName, settings, transportService, unicastHostsProvider);
@@ -366,11 +368,33 @@ private void startElection() {
366368
}
367369
}
368370

371+
private void abdicateTo(DiscoveryNode newMaster) {
372+
assert Thread.holdsLock(mutex);
373+
assert mode == Mode.LEADER : "expected to be leader on abdication but was " + mode;
374+
assert newMaster.isMasterNode() : "should only abdicate to master-eligible node but was " + newMaster;
375+
final StartJoinRequest startJoinRequest = new StartJoinRequest(newMaster, Math.max(getCurrentTerm(), maxTermSeen) + 1);
376+
logger.info("abdicating to {} with term {}", newMaster, startJoinRequest.getTerm());
377+
getLastAcceptedState().nodes().mastersFirstStream().forEach(node -> {
378+
if (isZen1Node(node) == false) {
379+
joinHelper.sendStartJoinRequest(startJoinRequest, node);
380+
}
381+
});
382+
// handling of start join messages on the local node will be dispatched to the generic thread-pool
383+
assert mode == Mode.LEADER : "should still be leader after sending abdication messages " + mode;
384+
// explicitly move node to candidate state so that the next cluster state update task yields an onNoLongerMaster event
385+
becomeCandidate("after abdicating to " + newMaster);
386+
}
387+
369388
private static boolean electionQuorumContainsLocalNode(ClusterState lastAcceptedState) {
370-
final String localNodeId = lastAcceptedState.nodes().getLocalNodeId();
371-
assert localNodeId != null;
372-
return lastAcceptedState.getLastCommittedConfiguration().getNodeIds().contains(localNodeId)
373-
|| lastAcceptedState.getLastAcceptedConfiguration().getNodeIds().contains(localNodeId);
389+
final DiscoveryNode localNode = lastAcceptedState.nodes().getLocalNode();
390+
assert localNode != null;
391+
return electionQuorumContains(lastAcceptedState, localNode);
392+
}
393+
394+
private static boolean electionQuorumContains(ClusterState lastAcceptedState, DiscoveryNode node) {
395+
final String nodeId = node.getId();
396+
return lastAcceptedState.getLastCommittedConfiguration().getNodeIds().contains(nodeId)
397+
|| lastAcceptedState.getLastAcceptedConfiguration().getNodeIds().contains(nodeId);
374398
}
375399

376400
private Optional<Join> ensureTermAtLeast(DiscoveryNode sourceNode, long targetTerm) {
@@ -780,7 +804,7 @@ ClusterState improveConfiguration(ClusterState clusterState) {
780804
.filter(this::hasJoinVoteFrom).filter(discoveryNode -> isZen1Node(discoveryNode) == false).collect(Collectors.toSet());
781805
final VotingConfiguration newConfig = reconfigurator.reconfigure(liveNodes,
782806
clusterState.getVotingConfigExclusions().stream().map(VotingConfigExclusion::getNodeId).collect(Collectors.toSet()),
783-
clusterState.getLastAcceptedConfiguration());
807+
getLocalNode(), clusterState.getLastAcceptedConfiguration());
784808
if (newConfig.equals(clusterState.getLastAcceptedConfiguration()) == false) {
785809
assert coordinationState.get().joinVotesHaveQuorumFor(newConfig);
786810
return ClusterState.builder(clusterState).metaData(MetaData.builder(clusterState.metaData())
@@ -1192,7 +1216,18 @@ public void onSuccess(String source) {
11921216
updateMaxTermSeen(getCurrentTerm());
11931217

11941218
if (mode == Mode.LEADER) {
1195-
scheduleReconfigurationIfNeeded();
1219+
final ClusterState state = getLastAcceptedState(); // committed state
1220+
if (electionQuorumContainsLocalNode(state) == false) {
1221+
final List<DiscoveryNode> masterCandidates = completedNodes().stream()
1222+
.filter(DiscoveryNode::isMasterNode)
1223+
.filter(node -> electionQuorumContains(state, node))
1224+
.collect(Collectors.toList());
1225+
if (masterCandidates.isEmpty() == false) {
1226+
abdicateTo(masterCandidates.get(random.nextInt(masterCandidates.size())));
1227+
}
1228+
} else {
1229+
scheduleReconfigurationIfNeeded();
1230+
}
11961231
}
11971232
lagDetector.startLagDetector(publishRequest.getAcceptedState().version());
11981233
}

server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.Optional;
3737
import java.util.Set;
3838
import java.util.function.LongSupplier;
39+
import java.util.stream.Collectors;
3940

4041
public abstract class Publication {
4142

@@ -92,6 +93,13 @@ public void onFaultyNode(DiscoveryNode faultyNode) {
9293
onPossibleCompletion();
9394
}
9495

96+
public List<DiscoveryNode> completedNodes() {
97+
return publicationTargets.stream()
98+
.filter(PublicationTarget::isSuccessfullyCompleted)
99+
.map(PublicationTarget::getDiscoveryNode)
100+
.collect(Collectors.toList());
101+
}
102+
95103
public boolean isCommitted() {
96104
return applyCommitRequest.isPresent();
97105
}
@@ -268,6 +276,10 @@ void onFaultyNode(DiscoveryNode faultyNode) {
268276
}
269277
}
270278

279+
DiscoveryNode getDiscoveryNode() {
280+
return discoveryNode;
281+
}
282+
271283
private void ackOnce(Exception e) {
272284
if (ackIsPending) {
273285
ackIsPending = false;
@@ -280,6 +292,10 @@ boolean isActive() {
280292
&& state != PublicationTargetState.APPLIED_COMMIT;
281293
}
282294

295+
boolean isSuccessfullyCompleted() {
296+
return state == PublicationTargetState.APPLIED_COMMIT;
297+
}
298+
283299
boolean isWaitingForQuorum() {
284300
return state == PublicationTargetState.WAITING_FOR_QUORUM;
285301
}

server/src/main/java/org/elasticsearch/cluster/coordination/Reconfigurator.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.common.util.set.Sets;
3131

3232
import java.util.Collection;
33+
import java.util.Collections;
3334
import java.util.Set;
3435
import java.util.TreeSet;
3536
import java.util.stream.Collectors;
@@ -90,18 +91,23 @@ public String toString() {
9091
* @param retiredNodeIds Nodes that are leaving the cluster and which should not appear in the configuration if possible. Nodes that are
9192
* retired and not in the current configuration will never appear in the resulting configuration; this is useful
9293
* for shifting the vote in a 2-node cluster so one of the nodes can be restarted without harming availability.
94+
* @param currentMaster The current master. Unless retired, we prefer to keep the current master in the config.
9395
* @param currentConfig The current configuration. As far as possible, we prefer to keep the current config as-is.
9496
* @return An optimal configuration, or leave the current configuration unchanged if the optimal configuration has no live quorum.
9597
*/
96-
public VotingConfiguration reconfigure(Set<DiscoveryNode> liveNodes, Set<String> retiredNodeIds, VotingConfiguration currentConfig) {
98+
public VotingConfiguration reconfigure(Set<DiscoveryNode> liveNodes, Set<String> retiredNodeIds, DiscoveryNode currentMaster,
99+
VotingConfiguration currentConfig) {
97100
assert liveNodes.stream().noneMatch(Coordinator::isZen1Node) : liveNodes;
98-
logger.trace("{} reconfiguring {} based on liveNodes={}, retiredNodeIds={}", this, currentConfig, liveNodes, retiredNodeIds);
101+
assert liveNodes.contains(currentMaster) : "liveNodes = " + liveNodes + " master = " + currentMaster;
102+
logger.trace("{} reconfiguring {} based on liveNodes={}, retiredNodeIds={}, currentMaster={}",
103+
this, currentConfig, liveNodes, retiredNodeIds, currentMaster);
99104

100105
/*
101106
* There are three true/false properties of each node in play: live/non-live, retired/non-retired and in-config/not-in-config.
102107
* Firstly we divide the nodes into disjoint sets based on these properties:
103108
*
104-
* - nonRetiredInConfigNotLiveIds
109+
* - nonRetiredMaster
110+
* - nonRetiredNotMasterInConfigNotLiveIds
105111
* - nonRetiredInConfigLiveIds
106112
* - nonRetiredLiveNotInConfigIds
107113
*
@@ -125,6 +131,17 @@ public VotingConfiguration reconfigure(Set<DiscoveryNode> liveNodes, Set<String>
125131
final Set<String> nonRetiredInConfigLiveIds = new TreeSet<>(liveInConfigIds);
126132
nonRetiredInConfigLiveIds.removeAll(retiredNodeIds);
127133

134+
final Set<String> nonRetiredInConfigLiveMasterIds;
135+
final Set<String> nonRetiredInConfigLiveNotMasterIds;
136+
if (nonRetiredInConfigLiveIds.contains(currentMaster.getId())) {
137+
nonRetiredInConfigLiveNotMasterIds = new TreeSet<>(nonRetiredInConfigLiveIds);
138+
nonRetiredInConfigLiveNotMasterIds.remove(currentMaster.getId());
139+
nonRetiredInConfigLiveMasterIds = Collections.singleton(currentMaster.getId());
140+
} else {
141+
nonRetiredInConfigLiveNotMasterIds = nonRetiredInConfigLiveIds;
142+
nonRetiredInConfigLiveMasterIds = Collections.emptySet();
143+
}
144+
128145
final Set<String> nonRetiredLiveNotInConfigIds = Sets.sortedDifference(liveNodeIds, currentConfig.getNodeIds());
129146
nonRetiredLiveNotInConfigIds.removeAll(retiredNodeIds);
130147

@@ -151,9 +168,9 @@ public VotingConfiguration reconfigure(Set<DiscoveryNode> liveNodes, Set<String>
151168
* The new configuration is formed by taking this many nodes in the following preference order:
152169
*/
153170
final VotingConfiguration newConfig = new VotingConfiguration(
154-
// live nodes first, preferring the current config, and if we need more then use non-live nodes
155-
Stream.of(nonRetiredInConfigLiveIds, nonRetiredLiveNotInConfigIds, nonRetiredInConfigNotLiveIds)
156-
.flatMap(Collection::stream).limit(targetSize).collect(Collectors.toSet()));
171+
// live master first, then other live nodes, preferring the current config, and if we need more then use non-live nodes
172+
Stream.of(nonRetiredInConfigLiveMasterIds, nonRetiredInConfigLiveNotMasterIds, nonRetiredLiveNotInConfigIds,
173+
nonRetiredInConfigNotLiveIds).flatMap(Collection::stream).limit(targetSize).collect(Collectors.toSet()));
157174

158175
if (newConfig.hasQuorum(liveNodeIds)) {
159176
return newConfig;

server/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesIT.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@
3131
import org.elasticsearch.test.ESIntegTestCase;
3232
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
3333
import org.elasticsearch.test.ESIntegTestCase.Scope;
34+
import org.elasticsearch.test.InternalTestCluster;
3435
import org.elasticsearch.test.junit.annotations.TestLogging;
3536

3637
import java.io.IOException;
3738
import java.util.Collections;
3839
import java.util.List;
39-
import java.util.concurrent.ExecutionException;
4040

4141
import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING;
4242
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@@ -106,7 +106,7 @@ public void testSimpleOnlyMasterNodeElection() throws IOException {
106106
.execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(nextMasterEligibleNodeName));
107107
}
108108

109-
public void testElectOnlyBetweenMasterNodes() throws IOException, ExecutionException, InterruptedException {
109+
public void testElectOnlyBetweenMasterNodes() throws Exception {
110110
logger.info("--> start data node / non master node");
111111
internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), true)
112112
.put(Node.NODE_MASTER_SETTING.getKey(), false).put("discovery.initial_state_timeout", "1s"));
@@ -138,7 +138,14 @@ public void testElectOnlyBetweenMasterNodes() throws IOException, ExecutionExcep
138138
logger.info("--> closing master node (1)");
139139
client().execute(AddVotingConfigExclusionsAction.INSTANCE,
140140
new AddVotingConfigExclusionsRequest(new String[]{masterNodeName})).get();
141-
internalCluster().stopCurrentMasterNode();
141+
// removing the master from the voting configuration immediately triggers the master to step down
142+
assertBusy(() -> {
143+
assertThat(internalCluster().nonMasterClient().admin().cluster().prepareState()
144+
.execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(nextMasterEligableNodeName));
145+
assertThat(internalCluster().masterClient().admin().cluster().prepareState()
146+
.execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(nextMasterEligableNodeName));
147+
});
148+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(masterNodeName));
142149
assertThat(internalCluster().nonMasterClient().admin().cluster().prepareState()
143150
.execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(nextMasterEligableNodeName));
144151
assertThat(internalCluster().masterClient().admin().cluster().prepareState()

server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.elasticsearch.test.disruption.DisruptableMockTransport.ConnectionStatus;
6767
import org.elasticsearch.transport.TransportService;
6868
import org.hamcrest.Matcher;
69+
import org.hamcrest.core.IsCollectionContaining;
6970
import org.junit.After;
7071
import org.junit.Before;
7172

@@ -1331,6 +1332,8 @@ void stabilise(long stabilisationDurationMillis) {
13311332
final VotingConfiguration lastCommittedConfiguration = lastAcceptedState.getLastCommittedConfiguration();
13321333
assertTrue(connectedNodeIds + " should be a quorum of " + lastCommittedConfiguration,
13331334
lastCommittedConfiguration.hasQuorum(connectedNodeIds));
1335+
assertThat("leader " + leader.getLocalNode() + " should be part of voting configuration " + lastCommittedConfiguration,
1336+
lastCommittedConfiguration.getNodeIds(), IsCollectionContaining.hasItem(leader.getLocalNode().getId()));
13341337

13351338
assertThat("no reconfiguration is in progress",
13361339
lastAcceptedState.getLastCommittedConfiguration(), equalTo(lastAcceptedState.getLastAcceptedConfiguration()));

server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656

5757
import static org.hamcrest.Matchers.containsInAnyOrder;
5858
import static org.hamcrest.Matchers.containsString;
59+
import static org.hamcrest.Matchers.empty;
5960
import static org.hamcrest.Matchers.equalTo;
6061

6162
public class PublicationTests extends ESTestCase {
@@ -178,6 +179,7 @@ public void testSimpleClusterStatePublishing() throws InterruptedException {
178179
discoveryNodes, singleNodeConfig, singleNodeConfig, 42L), ackListener, Collections.emptySet());
179180

180181
assertThat(publication.pendingPublications.keySet(), equalTo(discoNodes));
182+
assertThat(publication.completedNodes(), empty());
181183
assertTrue(publication.pendingCommits.isEmpty());
182184
AtomicBoolean processedNode1PublishResponse = new AtomicBoolean();
183185
boolean delayProcessingNode2PublishResponse = randomBoolean();
@@ -232,10 +234,12 @@ public void testSimpleClusterStatePublishing() throws InterruptedException {
232234

233235
assertFalse(publication.completed);
234236
assertFalse(publication.committed);
237+
assertThat(publication.completedNodes(), containsInAnyOrder(n1, n3));
235238
publication.pendingCommits.get(n2).onResponse(TransportResponse.Empty.INSTANCE);
236239
}
237240

238241
assertTrue(publication.completed);
242+
assertThat(publication.completedNodes(), containsInAnyOrder(n1, n2, n3));
239243
assertTrue(publication.committed);
240244

241245
assertThat(ackListener.await(0L, TimeUnit.SECONDS), containsInAnyOrder(n1, n2, n3));

0 commit comments

Comments
 (0)