Skip to content
Merged
Show file tree
Hide file tree
Changes from 82 commits
Commits
Show all changes
90 commits
Select commit Hold shift + click to select a range
34ea2b0
[Zen2] Support rolling upgrades from Zen1
DaveCTurner Nov 19, 2018
1ec1f8b
CheckStyle
DaveCTurner Nov 20, 2018
32a3d82
Moar test
DaveCTurner Nov 21, 2018
bbb700f
Merge branch 'zen2' into 2018-11-19-rolling-upgrade-to-zen2
DaveCTurner Nov 21, 2018
0678ebe
Post-merge fixup
DaveCTurner Nov 21, 2018
48888e0
Bogus assertion
DaveCTurner Nov 21, 2018
bb7a04c
Avoid problematic case
DaveCTurner Nov 21, 2018
7eb0137
Add restart tests as well as migration ones
DaveCTurner Nov 21, 2018
5595d2b
Imports
DaveCTurner Nov 21, 2018
7ea3769
Better restart tests
DaveCTurner Nov 22, 2018
6c3bb69
Must claim unknown version
DaveCTurner Nov 22, 2018
abe570e
Merge branch 'zen2' into 2018-11-19-rolling-upgrade-to-zen2
DaveCTurner Nov 22, 2018
c83bb34
Suppress exceptions during upgrade bootstrap
DaveCTurner Nov 22, 2018
04208ab
Fake ping ID 0 for remote pings
DaveCTurner Nov 22, 2018
491349a
Use FAKE_PING_ID to describe the situation
DaveCTurner Nov 22, 2018
b218215
Imports
DaveCTurner Nov 22, 2018
98e0882
Add TODOs
DaveCTurner Nov 22, 2018
a2d609d
Whitespace
DaveCTurner Nov 26, 2018
ae2b7fb
Revert bootstrapping condition
DaveCTurner Nov 26, 2018
5f5aeff
Revert "Revert bootstrapping condition"
DaveCTurner Nov 26, 2018
980e1a3
Set default for use_zen2 to true
DaveCTurner Nov 26, 2018
c1b69aa
Wait for master election before restarting node
DaveCTurner Nov 26, 2018
14256d0
Imports
DaveCTurner Nov 26, 2018
63bbe70
Merge branch 'zen2' into 2018-11-19-rolling-upgrade-to-zen2
DaveCTurner Nov 26, 2018
fd5ce9f
Bah
DaveCTurner Nov 26, 2018
ea2cb3d
Don't use ZEN2 in InternalTestClusterTests yet
DaveCTurner Nov 26, 2018
59c1083
Add bootstrapping parameter
DaveCTurner Nov 26, 2018
03cd1bb
Need to configure file-based discovery
DaveCTurner Nov 26, 2018
59b8e91
Don't use Zen2 in CCR integ tests (yet)
DaveCTurner Nov 26, 2018
bdd13ac
Another cluster needing file-based discovery
DaveCTurner Nov 27, 2018
bf614ce
File-based discovery should not be using settings-based discovery
DaveCTurner Nov 27, 2018
249231c
Suppress election scheduling on non-master nodes
DaveCTurner Nov 27, 2018
fd11436
Do not activate discovery upgrade service if we have already left term 0
DaveCTurner Nov 27, 2018
472615a
Add assertions and comment
DaveCTurner Nov 27, 2018
fd23114
Tidy up
DaveCTurner Nov 27, 2018
0c2c14e
Unused method
DaveCTurner Nov 27, 2018
af70cda
Be tolerant of version mismatches in compatibility mode
DaveCTurner Nov 28, 2018
a1e393f
Fix comment, no longer a TODO
DaveCTurner Nov 28, 2018
689dbd8
Only elect master-eligible nodes
DaveCTurner Nov 28, 2018
0b582a3
Nonsense, it's fine
DaveCTurner Nov 28, 2018
1090db8
Update default for USE_ZEN2 to true
DaveCTurner Nov 28, 2018
48eb72c
Cannot request pre-votes from Zen1 nodes, nor include them in the con…
DaveCTurner Nov 28, 2018
0d40f93
Revert "Cannot request pre-votes from Zen1 nodes, nor include them in…
DaveCTurner Nov 28, 2018
63432f3
Cannot request pre-votes from Zen1 nodes, nor include them in the con…
DaveCTurner Nov 28, 2018
ace9194
Don't apply USE_ZEN2 if not using TestZenDiscovery
DaveCTurner Nov 28, 2018
126b4b4
Manipulate voting tombstones when restarting nodes in a mixed cluster
DaveCTurner Nov 28, 2018
71f2b03
Don't fiddle with tombstones when there's only 1 Zen2 node
DaveCTurner Nov 28, 2018
c93c05a
Revert - we check that the cluster state is consistent, and this fails
DaveCTurner Nov 28, 2018
2de0feb
Extend CoordinatorTests to support master-ineligible nodes
DaveCTurner Nov 28, 2018
7bdc19a
Make the use of Zen2 a test-suite-wide property
DaveCTurner Nov 28, 2018
1b09320
Merge branch 'zen2' into 2018-11-19-rolling-upgrade-to-zen2
DaveCTurner Nov 28, 2018
d976106
Merge branch '2018-11-19-default-to-zen2-in-tests' into 2018-11-19-ro…
DaveCTurner Nov 28, 2018
a404c45
Merge branch '2018-11-28-only-elect-master-eligible-nodes' into 2018-…
DaveCTurner Nov 28, 2018
6cc6c8b
Merge branch '2018-11-19-rolling-upgrade-to-zen2' of github.com:DaveC…
DaveCTurner Nov 28, 2018
1055e64
Merge branch 'zen2' into 2018-11-19-rolling-upgrade-to-zen2
DaveCTurner Nov 29, 2018
7c4adc1
Unnecessary tests
DaveCTurner Nov 29, 2018
f078d9a
Add tests for start-all-stop-all migrations
DaveCTurner Nov 30, 2018
fe51316
Zen2 cluster should reject publications from Zen1
DaveCTurner Nov 30, 2018
2499fe1
Ignore ResourceAlreadyExistsException
DaveCTurner Nov 30, 2018
55841ab
Revert "Ignore ResourceAlreadyExistsException"
DaveCTurner Nov 30, 2018
eeef3bd
Merge branch 'zen2' into 2018-11-19-rolling-upgrade-to-zen2
DaveCTurner Dec 4, 2018
82a28a6
Merge branch 'zen2' into 2018-11-19-rolling-upgrade-to-zen2
DaveCTurner Dec 4, 2018
2e9c084
Merge branch 'master' into 2018-11-19-rolling-upgrade-to-zen2
DaveCTurner Dec 6, 2018
e95c0bf
Fixup for rename
DaveCTurner Dec 6, 2018
6dd133a
Revert "Zen2 cluster should reject publications from Zen1"
DaveCTurner Dec 6, 2018
7f211d4
Un-revert the useful bits
DaveCTurner Dec 6, 2018
d9c1ca4
Actually, this is fine, we might have bumped a term (e.g. due to foll…
DaveCTurner Dec 6, 2018
13f71c5
Line length
DaveCTurner Dec 6, 2018
4ec0cec
Merge branch 'master' into 2018-11-19-rolling-upgrade-to-zen2
DaveCTurner Dec 7, 2018
67ac92e
_rolling_ upgrade
DaveCTurner Dec 7, 2018
c01216b
Revert
DaveCTurner Dec 7, 2018
5d11689
Revert
DaveCTurner Dec 7, 2018
0958cdb
Don't send PeersRequest to local node
DaveCTurner Dec 7, 2018
b0ab023
Merge branch 'master' into 2018-11-19-rolling-upgrade-to-zen2
DaveCTurner Dec 7, 2018
3435c73
Horrible hack to force Zen1 nodes not to elect Zen2 masters
DaveCTurner Dec 7, 2018
d5ba1c7
Don't send incomprehensible messages from the future
DaveCTurner Dec 7, 2018
1cfe428
Maybe more shards
DaveCTurner Dec 7, 2018
4b7b899
Compile error
DaveCTurner Dec 7, 2018
a96aa91
Simplify testMixedClusterFormation and avoid the 1+1 case
DaveCTurner Dec 7, 2018
d0a23d4
Fewer devices
DaveCTurner Dec 7, 2018
ed0a5ca
Randomise shards not nodes, d'oh
DaveCTurner Dec 7, 2018
497c5cf
Merge branch 'master' into 2018-11-19-rolling-upgrade-to-zen2
DaveCTurner Dec 7, 2018
5d2ba53
static
DaveCTurner Dec 7, 2018
d1702b4
Won't do this TODO
DaveCTurner Dec 7, 2018
003b218
promote logging
DaveCTurner Dec 7, 2018
212049c
Add test that impossibly high id is impossibly high
DaveCTurner Dec 7, 2018
3da03f7
No TestLogging
DaveCTurner Dec 7, 2018
49a41d1
Don't run all the tests
DaveCTurner Dec 7, 2018
af11d61
Inline
DaveCTurner Dec 7, 2018
053d746
Only be lenient if the master changes
DaveCTurner Dec 7, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import java.util.Map;
import java.util.Optional;

import static org.elasticsearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM;

/**
* The core class of the cluster state coordination algorithm, directly implementing the
* <a href="https://github.com/elastic/elasticsearch-formal-models/blob/master/ZenWithTerms/tla/ZenWithTerms.tla">formal model</a>
Expand Down Expand Up @@ -321,10 +323,15 @@ public PublishResponse handlePublishRequest(PublishRequest publishRequest) {
getCurrentTerm());
}
if (clusterState.term() == getLastAcceptedTerm() && clusterState.version() <= getLastAcceptedVersion()) {
logger.debug("handlePublishRequest: ignored publish request due to version mismatch (expected: >[{}], actual: [{}])",
getLastAcceptedVersion(), clusterState.version());
throw new CoordinationStateRejectedException("incoming version " + clusterState.version() +
" lower or equal to current version " + getLastAcceptedVersion());
if (clusterState.term() == ZEN1_BWC_TERM) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can somehow avoid putting this extra condition in this class, perhaps by creating fresh persistedstate / coordinationstate instances. Also I wonder if we should enforce the version semantics as long as the states are coming from the same master (i.e. same ephemeral id)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On reflection, I think that creating a fresh persistedstate / coordinationstate is more cumbersome as the alternative. Still would be good to enforce the version semantics here if the cluster states are from the same master (i.e. same ephemeral id).

logger.debug("handling publish request in compatibility mode despite version mismatch (expected: >[{}], actual: [{}])",
getLastAcceptedVersion(), clusterState.version());
} else {
logger.debug("handlePublishRequest: ignored publish request due to version mismatch (expected: >[{}], actual: [{}])",
getLastAcceptedVersion(), clusterState.version());
throw new CoordinationStateRejectedException("incoming version " + clusterState.version() +
" lower or equal to current version " + getLastAcceptedVersion());
}
}

logger.trace("handlePublishRequest: accepting publish request for version [{}] and term [{}]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -86,6 +87,9 @@
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;

public class Coordinator extends AbstractLifecycleComponent implements Discovery {

public static final long ZEN1_BWC_TERM = 0;

private static final Logger logger = LogManager.getLogger(Coordinator.class);

// the timeout for the publication of each value
Expand Down Expand Up @@ -121,6 +125,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private long maxTermSeen;
private final Reconfigurator reconfigurator;
private final ClusterBootstrapService clusterBootstrapService;
private final DiscoveryUpgradeService discoveryUpgradeService;
private final LagDetector lagDetector;
private final ClusterFormationFailureHelper clusterFormationFailureHelper;

Expand Down Expand Up @@ -161,6 +166,8 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
masterService.setClusterStateSupplier(this::getStateForMasterService);
this.reconfigurator = new Reconfigurator(settings, clusterSettings);
this.clusterBootstrapService = new ClusterBootstrapService(settings, transportService);
this.discoveryUpgradeService = new DiscoveryUpgradeService(settings, clusterSettings, transportService, this::isBootstrapped,
joinHelper, peerFinder::getFoundPeers, this::unsafelySetConfigurationForUpgrade);
this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"),
transportService::getLocalNode);
this.clusterFormationFailureHelper = new ClusterFormationFailureHelper(settings, this::getClusterFormationState,
Expand Down Expand Up @@ -256,6 +263,14 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) {
throw new CoordinationStateRejectedException("no longer leading this publication's term: " + publishRequest);
}

if (publishRequest.getAcceptedState().term() == ZEN1_BWC_TERM && getCurrentTerm() == ZEN1_BWC_TERM
&& mode == Mode.FOLLOWER && Optional.of(sourceNode).equals(lastKnownLeader) == false) {

logger.debug("received cluster state from {} but currently following {}, rejecting", sourceNode, lastKnownLeader);
throw new CoordinationStateRejectedException("received cluster state from " + sourceNode + " but currently following "
+ lastKnownLeader + ", rejecting");
}

ensureTermAtLeast(sourceNode, publishRequest.getAcceptedState().term());
final PublishResponse publishResponse = coordinationState.get().handlePublishRequest(publishRequest);

Expand Down Expand Up @@ -323,7 +338,11 @@ private void startElection() {
final StartJoinRequest startJoinRequest
= new StartJoinRequest(getLocalNode(), Math.max(getCurrentTerm(), maxTermSeen) + 1);
logger.debug("starting election with {}", startJoinRequest);
getDiscoveredNodes().forEach(node -> joinHelper.sendStartJoinRequest(startJoinRequest, node));
getDiscoveredNodes().forEach(node -> {
if (isZen1Node(node) == false) {
joinHelper.sendStartJoinRequest(startJoinRequest, node);
}
});
}
}
}
Expand Down Expand Up @@ -384,6 +403,11 @@ void becomeCandidate(String method) {

peerFinder.activate(coordinationState.get().getLastAcceptedState().nodes());
clusterFormationFailureHelper.start();

if (getCurrentTerm() == ZEN1_BWC_TERM) {
discoveryUpgradeService.activate(lastKnownLeader);
}

leaderChecker.setCurrentNodes(DiscoveryNodes.EMPTY_NODES);
leaderChecker.updateLeader(null);

Expand Down Expand Up @@ -414,6 +438,7 @@ void becomeLeader(String method) {

lastKnownLeader = Optional.of(getLocalNode());
peerFinder.deactivate(getLocalNode());
discoveryUpgradeService.deactivate();
clusterFormationFailureHelper.stop();
closePrevotingAndElectionScheduler();
preVoteCollector.update(getPreVoteResponse(), getLocalNode());
Expand All @@ -439,6 +464,7 @@ void becomeFollower(String method, DiscoveryNode leaderNode) {

lastKnownLeader = Optional.of(leaderNode);
peerFinder.deactivate(leaderNode);
discoveryUpgradeService.deactivate();
clusterFormationFailureHelper.stop();
closePrevotingAndElectionScheduler();
cancelActivePublication();
Expand Down Expand Up @@ -647,9 +673,6 @@ public boolean setInitialConfiguration(final VotingConfiguration votingConfigura
return false;
}

assert currentState.term() == 0 : currentState;
assert currentState.version() == 0 : currentState;

if (mode != Mode.CANDIDATE) {
throw new CoordinationStateRejectedException("Cannot set initial configuration in mode " + mode);
}
Expand Down Expand Up @@ -681,12 +704,60 @@ public boolean setInitialConfiguration(final VotingConfiguration votingConfigura
}
}

private boolean isBootstrapped() {
return getLastAcceptedState().getLastAcceptedConfiguration().isEmpty() == false;
}

private void unsafelySetConfigurationForUpgrade(VotingConfiguration votingConfiguration) {
assert Version.CURRENT.major == Version.V_6_6_0.major + 1 : "remove this method once unsafe upgrades are no longer needed";
synchronized (mutex) {
if (mode != Mode.CANDIDATE) {
throw new IllegalStateException("Cannot overwrite configuration in mode " + mode);
}

if (isBootstrapped()) {
throw new IllegalStateException("Cannot overwrite configuration: configuration is already set to "
+ getLastAcceptedState().getLastAcceptedConfiguration());
}

if (lastKnownLeader.map(Coordinator::isZen1Node).orElse(false) == false) {
throw new IllegalStateException("Cannot upgrade from last-known leader: " + lastKnownLeader);
}

if (getCurrentTerm() != ZEN1_BWC_TERM) {
throw new IllegalStateException("Cannot upgrade, term is " + getCurrentTerm());
}

logger.info("automatically bootstrapping during rolling upgrade, using initial configuration {}", votingConfiguration);

final long newTerm = 1;
final ClusterState currentState = getStateForMasterService();
final Builder builder = masterService.incrementVersion(currentState);
builder.metaData(MetaData.builder(currentState.metaData()).coordinationMetaData(
CoordinationMetaData.builder(currentState.metaData().coordinationMetaData())
.term(newTerm)
.lastAcceptedConfiguration(votingConfiguration)
.lastCommittedConfiguration(votingConfiguration)
.build()));
final ClusterState newClusterState = builder.build();

coordinationState.get().handleStartJoin(new StartJoinRequest(getLocalNode(), newClusterState.term()));
coordinationState.get().handlePublishRequest(new PublishRequest(newClusterState));

followersChecker.clearCurrentNodes();
followersChecker.updateFastResponseState(getCurrentTerm(), mode);

peerFinder.deactivate(getLocalNode());
peerFinder.activate(newClusterState.nodes());
}
}

// Package-private for testing
ClusterState improveConfiguration(ClusterState clusterState) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";

final Set<DiscoveryNode> liveNodes = StreamSupport.stream(clusterState.nodes().spliterator(), false)
.filter(this::hasJoinVoteFrom).collect(Collectors.toSet());
.filter(this::hasJoinVoteFrom).filter(discoveryNode -> isZen1Node(discoveryNode) == false).collect(Collectors.toSet());
final VotingConfiguration newConfig = reconfigurator.reconfigure(liveNodes,
clusterState.getVotingConfigExclusions().stream().map(VotingConfigExclusion::getNodeId).collect(Collectors.toSet()),
clusterState.getLastAcceptedConfiguration());
Expand Down Expand Up @@ -967,7 +1038,9 @@ public void run() {
prevotingRound.close();
}
final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
prevotingRound = preVoteCollector.start(lastAcceptedState, getDiscoveredNodes());
final List<DiscoveryNode> discoveredNodes
= getDiscoveredNodes().stream().filter(n -> isZen1Node(n) == false).collect(Collectors.toList());
prevotingRound = preVoteCollector.start(lastAcceptedState, discoveredNodes);
}
}
}
Expand Down Expand Up @@ -1176,13 +1249,13 @@ protected void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest app
}

// TODO: only here temporarily for BWC development, remove once complete
public static Settings.Builder addZen1Attribute(Settings.Builder builder) {
return builder.put("node.attr.zen1", true);
public static Settings.Builder addZen1Attribute(boolean isZen1Node, Settings.Builder builder) {
return builder.put("node.attr.zen1", isZen1Node);
}

// TODO: only here temporarily for BWC development, remove once complete
public static boolean isZen1Node(DiscoveryNode discoveryNode) {
return discoveryNode.getVersion().before(Version.V_7_0_0) ||
discoveryNode.getAttributes().containsKey("zen1");
(Booleans.isTrue(discoveryNode.getAttributes().getOrDefault("zen1", "false")));
}
}
Loading