Skip to content

Commit 52a3a19

Browse files
authored
Add low-level bootstrap implementation (#34345)
Today we inject the initial configuration of the cluster (i.e. the set of voting nodes) at startup. In reality we must support injecting the initial configuration after startup too. This commit adds low-level support for doing so as safely as possible.
1 parent ac99d1d commit 52a3a19

File tree

4 files changed

+164
-36
lines changed

4 files changed

+164
-36
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -132,17 +132,17 @@ public void setInitialState(ClusterState initialState) {
132132
throw new CoordinationStateRejectedException("initial state already set: last-accepted version now " + lastAcceptedVersion);
133133
}
134134

135-
assert getLastAcceptedTerm() == 0;
136-
assert getLastAcceptedConfiguration().isEmpty();
137-
assert getLastCommittedConfiguration().isEmpty();
138-
assert lastPublishedVersion == 0;
139-
assert lastPublishedConfiguration.isEmpty();
135+
assert getLastAcceptedTerm() == 0 : getLastAcceptedTerm();
136+
assert getLastAcceptedConfiguration().isEmpty() : getLastAcceptedConfiguration();
137+
assert getLastCommittedConfiguration().isEmpty() : getLastCommittedConfiguration();
138+
assert lastPublishedVersion == 0 : lastAcceptedVersion;
139+
assert lastPublishedConfiguration.isEmpty() : lastPublishedConfiguration;
140140
assert electionWon == false;
141-
assert joinVotes.isEmpty();
142-
assert publishVotes.isEmpty();
141+
assert joinVotes.isEmpty() : joinVotes;
142+
assert publishVotes.isEmpty() : publishVotes;
143143

144-
assert initialState.term() == 0;
145-
assert initialState.version() == 1;
144+
assert initialState.term() == 0 : initialState;
145+
assert initialState.version() == 1 : initialState;
146146
assert initialState.getLastAcceptedConfiguration().isEmpty() == false;
147147
assert initialState.getLastCommittedConfiguration().isEmpty() == false;
148148

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

Lines changed: 62 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.elasticsearch.cluster.ClusterChangedEvent;
2525
import org.elasticsearch.cluster.ClusterName;
2626
import org.elasticsearch.cluster.ClusterState;
27+
import org.elasticsearch.cluster.ClusterState.Builder;
28+
import org.elasticsearch.cluster.ClusterState.VotingConfiguration;
2729
import org.elasticsearch.cluster.ClusterStateTaskConfig;
2830
import org.elasticsearch.cluster.block.ClusterBlocks;
2931
import org.elasticsearch.cluster.coordination.FollowersChecker.FollowerCheckRequest;
@@ -63,6 +65,7 @@
6365
import java.util.Set;
6466
import java.util.concurrent.atomic.AtomicLong;
6567
import java.util.function.Supplier;
68+
import java.util.stream.Collectors;
6669

6770
import static org.elasticsearch.discovery.DiscoverySettings.NO_MASTER_BLOCK_WRITES;
6871
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
@@ -480,6 +483,8 @@ public void invariant() {
480483
assert followersChecker.getFastResponseState().term == getCurrentTerm() : followersChecker.getFastResponseState();
481484
assert followersChecker.getFastResponseState().mode == getMode() : followersChecker.getFastResponseState();
482485
assert (applierState.nodes().getMasterNodeId() == null) == applierState.blocks().hasGlobalBlock(NO_MASTER_BLOCK_WRITES.id());
486+
assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse())
487+
: preVoteCollector + " vs " + getPreVoteResponse();
483488
if (mode == Mode.LEADER) {
484489
final boolean becomingMaster = getStateForMasterService().term() != getCurrentTerm();
485490

@@ -493,7 +498,6 @@ public void invariant() {
493498
assert leaderCheckScheduler == null : leaderCheckScheduler;
494499
assert applierState.nodes().getMasterNodeId() == null || getLocalNode().equals(applierState.nodes().getMasterNode());
495500
assert preVoteCollector.getLeader() == getLocalNode() : preVoteCollector;
496-
assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector;
497501

498502
final boolean activePublication = currentPublication.map(CoordinatorPublication::isActiveForCurrentLeader).orElse(false);
499503
if (becomingMaster && activePublication == false) {
@@ -527,7 +531,6 @@ public void invariant() {
527531
assert followersChecker.getKnownFollowers().isEmpty();
528532
assert currentPublication.map(Publication::isCommitted).orElse(true);
529533
assert preVoteCollector.getLeader().equals(lastKnownLeader.get()) : preVoteCollector;
530-
assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector;
531534
} else {
532535
assert mode == Mode.CANDIDATE;
533536
assert joinAccumulator instanceof JoinHelper.CandidateJoinAccumulator;
@@ -540,11 +543,42 @@ public void invariant() {
540543
assert applierState.nodes().getMasterNodeId() == null;
541544
assert currentPublication.map(Publication::isCommitted).orElse(true);
542545
assert preVoteCollector.getLeader() == null : preVoteCollector;
543-
assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector;
544546
}
545547
}
546548
}
547549

550+
public void setInitialConfiguration(final VotingConfiguration votingConfiguration) {
551+
synchronized (mutex) {
552+
final ClusterState currentState = getStateForMasterService();
553+
554+
if (currentState.getLastAcceptedConfiguration().isEmpty() == false) {
555+
throw new CoordinationStateRejectedException("Cannot set initial configuration: configuration has already been set");
556+
}
557+
assert currentState.term() == 0 : currentState;
558+
assert currentState.version() == 0 : currentState;
559+
560+
if (mode != Mode.CANDIDATE) {
561+
throw new CoordinationStateRejectedException("Cannot set initial configuration in mode " + mode);
562+
}
563+
564+
final List<DiscoveryNode> knownNodes = new ArrayList<>();
565+
knownNodes.add(getLocalNode());
566+
peerFinder.getFoundPeers().forEach(knownNodes::add);
567+
if (votingConfiguration.hasQuorum(knownNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toList())) == false) {
568+
throw new CoordinationStateRejectedException("not enough nodes discovered to form a quorum in the initial configuration " +
569+
"[knownNodes=" + knownNodes + ", " + votingConfiguration + "]");
570+
}
571+
572+
logger.info("setting initial configuration to {}", votingConfiguration);
573+
final Builder builder = masterService.incrementVersion(currentState);
574+
builder.lastAcceptedConfiguration(votingConfiguration);
575+
builder.lastCommittedConfiguration(votingConfiguration);
576+
coordinationState.get().setInitialState(builder.build());
577+
preVoteCollector.update(getPreVoteResponse(), null); // pick up the change to last-accepted version
578+
startElectionScheduler();
579+
}
580+
}
581+
548582
// for tests
549583
boolean hasJoinVoteFrom(DiscoveryNode localNode) {
550584
return coordinationState.get().containsJoinVoteFor(localNode);
@@ -731,25 +765,7 @@ protected void onFoundPeersUpdated() {
731765

732766
if (foundQuorum) {
733767
if (electionScheduler == null) {
734-
final TimeValue gracePeriod = TimeValue.ZERO; // TODO variable grace period
735-
electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, new Runnable() {
736-
@Override
737-
public void run() {
738-
synchronized (mutex) {
739-
if (mode == Mode.CANDIDATE) {
740-
if (prevotingRound != null) {
741-
prevotingRound.close();
742-
}
743-
prevotingRound = preVoteCollector.start(lastAcceptedState, getDiscoveredNodes());
744-
}
745-
}
746-
}
747-
748-
@Override
749-
public String toString() {
750-
return "scheduling of new prevoting round";
751-
}
752-
});
768+
startElectionScheduler();
753769
}
754770
} else {
755771
closePrevotingAndElectionScheduler();
@@ -759,6 +775,30 @@ public String toString() {
759775
}
760776
}
761777

778+
private void startElectionScheduler() {
779+
assert electionScheduler == null : electionScheduler;
780+
final TimeValue gracePeriod = TimeValue.ZERO; // TODO variable grace period
781+
electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, new Runnable() {
782+
@Override
783+
public void run() {
784+
synchronized (mutex) {
785+
if (mode == Mode.CANDIDATE) {
786+
if (prevotingRound != null) {
787+
prevotingRound.close();
788+
}
789+
final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
790+
prevotingRound = preVoteCollector.start(lastAcceptedState, getDiscoveredNodes());
791+
}
792+
}
793+
}
794+
795+
@Override
796+
public String toString() {
797+
return "scheduling of new prevoting round";
798+
}
799+
});
800+
}
801+
762802
class CoordinatorPublication extends Publication {
763803

764804
private final PublishRequest publishRequest;

server/src/main/java/org/elasticsearch/cluster/service/MasterService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ private ClusterState patchVersions(ClusterState previousClusterState, ClusterTas
329329
return newClusterState;
330330
}
331331

332-
protected Builder incrementVersion(ClusterState clusterState) {
332+
public Builder incrementVersion(ClusterState clusterState) {
333333
return ClusterState.builder(clusterState).incrementVersion();
334334
}
335335

server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java

Lines changed: 92 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151

5252
import java.util.ArrayList;
5353
import java.util.Arrays;
54+
import java.util.Collections;
5455
import java.util.HashMap;
5556
import java.util.HashSet;
5657
import java.util.List;
@@ -84,12 +85,15 @@
8485
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
8586
import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME;
8687
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
88+
import static org.hamcrest.Matchers.containsString;
8789
import static org.hamcrest.Matchers.empty;
90+
import static org.hamcrest.Matchers.endsWith;
8891
import static org.hamcrest.Matchers.equalTo;
8992
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
9093
import static org.hamcrest.Matchers.is;
9194
import static org.hamcrest.Matchers.lessThanOrEqualTo;
9295
import static org.hamcrest.Matchers.not;
96+
import static org.hamcrest.Matchers.startsWith;
9397

9498
@TestLogging("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.discovery:TRACE")
9599
public class CoordinatorTests extends ESTestCase {
@@ -404,6 +408,61 @@ public void testAckListenerReceivesNacksFromFollowerInHigherTerm() {
404408
// assertTrue("expected ack from " + follower1, ackCollector.hasAckedSuccessfully(follower1));
405409
}
406410

411+
public void testSettingInitialConfigurationTriggersElection() {
412+
final Cluster cluster = new Cluster(randomIntBetween(1, 5));
413+
cluster.runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2 + randomLongBetween(0, 60000), "initial discovery phase");
414+
for (final ClusterNode clusterNode : cluster.clusterNodes) {
415+
final String nodeId = clusterNode.getId();
416+
assertThat(nodeId + " is CANDIDATE", clusterNode.coordinator.getMode(), is(CANDIDATE));
417+
assertThat(nodeId + " is in term 0", clusterNode.coordinator.getCurrentTerm(), is(0L));
418+
assertThat(nodeId + " last accepted in term 0", clusterNode.coordinator.getLastAcceptedState().term(), is(0L));
419+
assertThat(nodeId + " last accepted version 0", clusterNode.coordinator.getLastAcceptedState().version(), is(0L));
420+
assertTrue(nodeId + " has an empty last-accepted configuration",
421+
clusterNode.coordinator.getLastAcceptedState().getLastAcceptedConfiguration().isEmpty());
422+
assertTrue(nodeId + " has an empty last-committed configuration",
423+
clusterNode.coordinator.getLastAcceptedState().getLastCommittedConfiguration().isEmpty());
424+
}
425+
426+
cluster.getAnyNode().applyInitialConfiguration();
427+
cluster.stabilise(defaultMillis(
428+
// the first election should succeed, because only one node knows of the initial configuration and therefore can win a
429+
// pre-voting round and proceed to an election, so there cannot be any collisions
430+
ELECTION_INITIAL_TIMEOUT_SETTING) // TODO this wait is unnecessary, we could trigger the election immediately
431+
// Allow two round-trip for pre-voting and voting
432+
+ 4 * DEFAULT_DELAY_VARIABILITY
433+
// Then a commit of the new leader's first cluster state
434+
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
435+
}
436+
437+
public void testCannotSetInitialConfigurationTwice() {
438+
final Cluster cluster = new Cluster(randomIntBetween(1, 5));
439+
cluster.runRandomly();
440+
cluster.stabilise();
441+
442+
final Coordinator coordinator = cluster.getAnyNode().coordinator;
443+
final CoordinationStateRejectedException exception = expectThrows(CoordinationStateRejectedException.class,
444+
() -> coordinator.setInitialConfiguration(coordinator.getLastAcceptedState().getLastCommittedConfiguration()));
445+
446+
assertThat(exception.getMessage(), is("Cannot set initial configuration: configuration has already been set"));
447+
}
448+
449+
public void testCannotSetInitialConfigurationWithoutQuorum() {
450+
final Cluster cluster = new Cluster(randomIntBetween(1, 5));
451+
final Coordinator coordinator = cluster.getAnyNode().coordinator;
452+
final VotingConfiguration unknownNodeConfiguration = new VotingConfiguration(Collections.singleton("unknown-node"));
453+
final String exceptionMessage = expectThrows(CoordinationStateRejectedException.class,
454+
() -> coordinator.setInitialConfiguration(unknownNodeConfiguration)).getMessage();
455+
assertThat(exceptionMessage,
456+
startsWith("not enough nodes discovered to form a quorum in the initial configuration [knownNodes=["));
457+
assertThat(exceptionMessage,
458+
endsWith("], VotingConfiguration{unknown-node}]"));
459+
assertThat(exceptionMessage, containsString(coordinator.getLocalNode().toString()));
460+
461+
// This is VERY BAD: setting a _different_ initial configuration. Yet it works if the first attempt will never be a quorum.
462+
coordinator.setInitialConfiguration(new VotingConfiguration(Collections.singleton(coordinator.getLocalNode().getId())));
463+
cluster.stabilise();
464+
}
465+
407466
private static long defaultMillis(Setting<TimeValue> setting) {
408467
return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY;
409468
}
@@ -555,6 +614,14 @@ void runRandomly() {
555614
}
556615
break;
557616
}
617+
} else if (rarely()) {
618+
final ClusterNode clusterNode = getAnyNode();
619+
onNode(clusterNode.getLocalNode(),
620+
() -> {
621+
logger.debug("----> [runRandomly {}] applying initial configuration {} to {}",
622+
thisStep, initialConfiguration, clusterNode.getId());
623+
clusterNode.coordinator.setInitialConfiguration(initialConfiguration);
624+
}).run();
558625
} else {
559626
if (deterministicTaskQueue.hasDeferredTasks() && randomBoolean()) {
560627
deterministicTaskQueue.advanceTime();
@@ -566,7 +633,6 @@ void runRandomly() {
566633
// TODO other random steps:
567634
// - reboot a node
568635
// - abdicate leadership
569-
// - bootstrap
570636

571637
} catch (CoordinationStateRejectedException ignored) {
572638
// This is ok: it just means a message couldn't currently be handled.
@@ -606,6 +672,17 @@ void stabilise() {
606672
void stabilise(long stabilisationDurationMillis) {
607673
assertThat("stabilisation requires default delay variability (and proper cleanup of raised variability)",
608674
deterministicTaskQueue.getExecutionDelayVariabilityMillis(), lessThanOrEqualTo(DEFAULT_DELAY_VARIABILITY));
675+
676+
if (clusterNodes.stream().allMatch(n -> n.coordinator.getLastAcceptedState().getLastAcceptedConfiguration().isEmpty())) {
677+
assertThat("setting initial configuration may fail with disconnected nodes", disconnectedNodes, empty());
678+
assertThat("setting initial configuration may fail with blackholed nodes", blackholedNodes, empty());
679+
runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2, "discovery prior to setting initial configuration");
680+
final ClusterNode bootstrapNode = getAnyNode();
681+
bootstrapNode.applyInitialConfiguration();
682+
} else {
683+
logger.info("setting initial configuration not required");
684+
}
685+
609686
runFor(stabilisationDurationMillis, "stabilising");
610687
fixLag();
611688
assertUniqueLeaderAndExpectedModes();
@@ -706,7 +783,7 @@ private void assertUniqueLeaderAndExpectedModes() {
706783

707784
ClusterNode getAnyLeader() {
708785
List<ClusterNode> allLeaders = clusterNodes.stream().filter(ClusterNode::isLeader).collect(Collectors.toList());
709-
assertThat(allLeaders, not(empty()));
786+
assertThat("leaders", allLeaders, not(empty()));
710787
return randomFrom(allLeaders);
711788
}
712789

@@ -759,8 +836,8 @@ class ClusterNode extends AbstractComponent {
759836
super(Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeIdFromIndex(nodeIndex)).build());
760837
this.nodeIndex = nodeIndex;
761838
localNode = createDiscoveryNode();
762-
persistedState = new InMemoryPersistedState(1L,
763-
clusterState(1L, 1L, localNode, initialConfiguration, initialConfiguration, 0L));
839+
persistedState = new InMemoryPersistedState(0L,
840+
clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L));
764841
onNode(localNode, this::setUp).run();
765842
}
766843

@@ -917,6 +994,17 @@ ClusterState getLastAppliedClusterState() {
917994
return clusterApplier.lastAppliedClusterState;
918995
}
919996

997+
void applyInitialConfiguration() {
998+
onNode(localNode, () -> {
999+
try {
1000+
coordinator.setInitialConfiguration(initialConfiguration);
1001+
logger.info("successfully set initial configuration to {}", initialConfiguration);
1002+
} catch (CoordinationStateRejectedException e) {
1003+
logger.info(new ParameterizedMessage("failed to set initial configuration to {}", initialConfiguration), e);
1004+
}
1005+
}).run();
1006+
}
1007+
9201008
private class FakeClusterApplier implements ClusterApplier {
9211009

9221010
final ClusterName clusterName;

0 commit comments

Comments
 (0)