From 2601abac1d588819f983cc0b98384b43ee0f3095 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Sat, 6 Oct 2018 09:56:36 +0200 Subject: [PATCH 1/9] fast disconnect --- .../cluster/coordination/Coordinator.java | 27 ++-- .../coordination/FollowersChecker.java | 63 +++++--- .../cluster/coordination/LeaderChecker.java | 58 +++++-- .../coordination/CoordinatorTests.java | 149 ++++++++++-------- .../coordination/FollowersCheckerTests.java | 61 ++++++- .../coordination/LeaderCheckerTests.java | 35 +++- .../test/transport/MockTransport.java | 17 +- 7 files changed, 274 insertions(+), 136 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 4b6fcc690b159..5a285654c106a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -101,8 +101,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private Releasable electionScheduler; @Nullable private Releasable prevotingRound; - @Nullable - private Releasable leaderCheckScheduler; private long maxTermSeen; private Mode mode; @@ -132,7 +130,7 @@ public Coordinator(Settings settings, TransportService transportService, Allocat this.publicationHandler = new PublicationTransportHandler(settings, transportService, this::handlePublishRequest, this::handleApplyCommit); this.leaderChecker = new LeaderChecker(settings, transportService, getOnLeaderFailure()); - this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::onFollowerFailure); + this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::removeNode); this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger); this.clusterApplier = clusterApplier; masterService.setClusterStateSupplier(this::getStateForMasterService); @@ -154,11 +152,11 @@ public String toString() { }; } - private void onFollowerFailure(DiscoveryNode discoveryNode) { + private void removeNode(DiscoveryNode discoveryNode, String reason) { synchronized (mutex) { if (mode == Mode.LEADER) { masterService.submitStateUpdateTask("node-left", - new NodeRemovalClusterStateTaskExecutor.Task(discoveryNode, "node left"), + new NodeRemovalClusterStateTaskExecutor.Task(discoveryNode, reason), ClusterStateTaskConfig.build(Priority.IMMEDIATE), nodeRemovalExecutor, nodeRemovalExecutor); @@ -344,11 +342,7 @@ void becomeCandidate(String method) { peerFinder.activate(coordinationState.get().getLastAcceptedState().nodes()); leaderChecker.setCurrentNodes(DiscoveryNodes.EMPTY_NODES); - - if (leaderCheckScheduler != null) { - leaderCheckScheduler.close(); - leaderCheckScheduler = null; - } + leaderChecker.updateLeader(null); followersChecker.clearCurrentNodes(); followersChecker.updateFastResponseState(getCurrentTerm(), mode); @@ -377,7 +371,7 @@ void becomeLeader(String method) { closePrevotingAndElectionScheduler(); preVoteCollector.update(getPreVoteResponse(), getLocalNode()); - assert leaderCheckScheduler == null : leaderCheckScheduler; + assert leaderChecker.leader() == null : leaderChecker.leader(); followersChecker.updateFastResponseState(getCurrentTerm(), mode); } @@ -401,10 +395,7 @@ void becomeFollower(String method, DiscoveryNode leaderNode) { preVoteCollector.update(getPreVoteResponse(), leaderNode); if (restartLeaderChecker) { - if (leaderCheckScheduler != null) { - leaderCheckScheduler.close(); - } - leaderCheckScheduler = leaderChecker.startLeaderChecker(leaderNode); + leaderChecker.updateLeader(leaderNode); } followersChecker.clearCurrentNodes(); @@ -501,7 +492,7 @@ public void invariant() { assert electionScheduler == null : electionScheduler; assert prevotingRound == null : prevotingRound; assert becomingMaster || getStateForMasterService().nodes().getMasterNodeId() != null : getStateForMasterService(); - assert leaderCheckScheduler == null : leaderCheckScheduler; + assert leaderChecker.leader() == null : leaderChecker.leader(); assert applierState.nodes().getMasterNodeId() == null || getLocalNode().equals(applierState.nodes().getMasterNode()); assert preVoteCollector.getLeader() == getLocalNode() : preVoteCollector; @@ -533,7 +524,7 @@ public void invariant() { assert prevotingRound == null : prevotingRound; assert getStateForMasterService().nodes().getMasterNodeId() == null : getStateForMasterService(); assert leaderChecker.currentNodeIsMaster() == false; - assert leaderCheckScheduler != null; + assert lastKnownLeader.equals(Optional.of(leaderChecker.leader())); assert followersChecker.getKnownFollowers().isEmpty(); assert currentPublication.map(Publication::isCommitted).orElse(true); assert preVoteCollector.getLeader().equals(lastKnownLeader.get()) : preVoteCollector; @@ -544,7 +535,7 @@ public void invariant() { assert prevotingRound == null || electionScheduler != null; assert getStateForMasterService().nodes().getMasterNodeId() == null : getStateForMasterService(); assert leaderChecker.currentNodeIsMaster() == false; - assert leaderCheckScheduler == null : leaderCheckScheduler; + assert leaderChecker.leader() == null : leaderChecker.leader(); assert followersChecker.getKnownFollowers().isEmpty(); assert applierState.nodes().getMasterNodeId() == null; assert currentPublication.map(Publication::isCommitted).orElse(true); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java index c2cd05bc34e29..43f4a3c054276 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java @@ -33,6 +33,7 @@ import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; @@ -46,6 +47,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Predicate; @@ -78,7 +80,7 @@ public class FollowersChecker extends AbstractComponent { private final TimeValue followerCheckInterval; private final TimeValue followerCheckTimeout; private final int followerCheckRetryCount; - private final Consumer onNodeFailure; + private final BiConsumer onNodeFailure; private final Consumer handleRequestAndUpdateState; private final Object mutex = new Object(); // protects writes to this state; read access does not need sync @@ -91,7 +93,7 @@ public class FollowersChecker extends AbstractComponent { public FollowersChecker(Settings settings, TransportService transportService, Consumer handleRequestAndUpdateState, - Consumer onNodeFailure) { + BiConsumer onNodeFailure) { super(settings); this.transportService = transportService; this.handleRequestAndUpdateState = handleRequestAndUpdateState; @@ -104,6 +106,12 @@ public FollowersChecker(Settings settings, TransportService transportService, updateFastResponseState(0, Mode.CANDIDATE); transportService.registerRequestHandler(FOLLOWER_CHECK_ACTION_NAME, Names.SAME, FollowerCheckRequest::new, (request, transportChannel, task) -> handleFollowerCheck(request, transportChannel)); + transportService.addConnectionListener(new TransportConnectionListener() { + @Override + public void onNodeDisconnected(DiscoveryNode node) { + handleDisconnectedNode(node); + } + }); } /** @@ -228,6 +236,15 @@ Set getKnownFollowers() { } } + private void handleDisconnectedNode(DiscoveryNode discoveryNode) { + synchronized (mutex) { + FollowerChecker followerChecker = followerCheckers.get(discoveryNode); + if (followerChecker != null && followerChecker.running()) { + followerChecker.failNode(); + } + } + } + static class FastResponseState { final long term; final Mode mode; @@ -314,25 +331,7 @@ public void handleException(TransportException exp) { return; } - transportService.getThreadPool().generic().execute(new Runnable() { - @Override - public void run() { - synchronized (mutex) { - if (running() == false) { - logger.debug("{} no longer running, not marking faulty", FollowerChecker.this); - return; - } - faultyNodes.add(discoveryNode); - followerCheckers.remove(discoveryNode); - } - onNodeFailure.accept(discoveryNode); - } - - @Override - public String toString() { - return "detected failure of " + discoveryNode; - } - }); + failNode(); } @@ -343,6 +342,28 @@ public String executor() { }); } + void failNode() { + transportService.getThreadPool().generic().execute(new Runnable() { + @Override + public void run() { + synchronized (mutex) { + if (running() == false) { + logger.debug("{} condition no longer applies, not marking faulty", discoveryNode); + return; + } + faultyNodes.add(discoveryNode); + followerCheckers.remove(discoveryNode); + } + onNodeFailure.accept(discoveryNode, "followers_checker"); + } + + @Override + public String toString() { + return "detected failure of " + discoveryNode; + } + }); + } + private void scheduleNextWakeUp() { transportService.getThreadPool().schedule(followerCheckInterval, Names.SAME, new Runnable() { @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java index fca23018a4f75..60fa88f5acc90 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -33,6 +34,7 @@ import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; @@ -46,6 +48,7 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; /** * The LeaderChecker is responsible for allowing followers to check that the currently elected leader is still connected and healthy. We are @@ -77,6 +80,8 @@ public class LeaderChecker extends AbstractComponent { private final TransportService transportService; private final Runnable onLeaderFailure; + private AtomicReference currentChecker = new AtomicReference<>(); + private volatile DiscoveryNodes discoveryNodes; public LeaderChecker(final Settings settings, final TransportService transportService, final Runnable onLeaderFailure) { @@ -88,19 +93,39 @@ public LeaderChecker(final Settings settings, final TransportService transportSe this.onLeaderFailure = onLeaderFailure; transportService.registerRequestHandler(LEADER_CHECK_ACTION_NAME, Names.SAME, LeaderCheckRequest::new, this::handleLeaderCheck); + transportService.addConnectionListener(new TransportConnectionListener() { + @Override + public void onNodeDisconnected(DiscoveryNode node) { + handleDisconnectedNode(node); + } + }); + } + + public DiscoveryNode leader() { + CheckScheduler checkScheduler = currentChecker.get(); + return checkScheduler == null ? null : checkScheduler.leader; } /** - * Start a leader checker for the given leader. Should only be called after successfully joining this leader. + * Starts and / or stops a leader checker for the given leader. Should only be called after successfully joining this leader. * - * @param leader the node to be checked as leader - * @return a `Releasable` that can be used to stop this checker. + * @param leader the node to be checked as leader, or null if checks should be disabled */ - public Releasable startLeaderChecker(final DiscoveryNode leader) { - assert transportService.getLocalNode().equals(leader) == false; - CheckScheduler checkScheduler = new CheckScheduler(leader); - checkScheduler.handleWakeUp(); - return checkScheduler; + public void updateLeader(@Nullable final DiscoveryNode leader) { + assert leader == null || transportService.getLocalNode().equals(leader) == false; + final CheckScheduler checkScheduler; + if (leader != null) { + checkScheduler = new CheckScheduler(leader); + } else { + checkScheduler = null; + } + CheckScheduler previousChecker = currentChecker.getAndSet(checkScheduler); + if (previousChecker != null) { + previousChecker.close(); + } + if (checkScheduler != null) { + checkScheduler.handleWakeUp(); + } } /** @@ -137,6 +162,15 @@ private void handleLeaderCheck(LeaderCheckRequest request, TransportChannel tran } } + private void handleDisconnectedNode(DiscoveryNode discoveryNode) { + CheckScheduler checkScheduler = currentChecker.get(); + if (checkScheduler != null) { + checkScheduler.handleDisconnectedNode(discoveryNode); + } else { + logger.trace("disconnect event ignored for {}, no check scheduler", discoveryNode); + } + } + private class CheckScheduler implements Releasable { private final AtomicBoolean isClosed = new AtomicBoolean(); @@ -222,7 +256,7 @@ public String executor() { }); } - private void leaderFailed() { + void leaderFailed() { if (isClosed.compareAndSet(false, true)) { transportService.getThreadPool().generic().execute(onLeaderFailure); } else { @@ -230,6 +264,12 @@ private void leaderFailed() { } } + void handleDisconnectedNode(DiscoveryNode discoveryNode) { + if (discoveryNode.equals(leader)) { + leaderFailed(); + } + } + private void scheduleNextWakeUp() { logger.trace("scheduling next check of {} for [{}] = {}", leader, LEADER_CHECK_INTERVAL_SETTING.getKey(), leaderCheckInterval); transportService.getThreadPool().schedule(leaderCheckInterval, Names.SAME, new Runnable() { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 9dad2390032e3..1b922e1e025a7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -152,31 +152,38 @@ public void testLeaderDisconnectionDetectedQuickly() { final ClusterNode originalLeader = cluster.getAnyLeader(); logger.info("--> disconnecting leader {}", originalLeader); originalLeader.disconnect(); - - cluster.stabilise(Math.max( - // Each follower may have just sent a leader check, which receives no response - // TODO not necessary if notified of disconnection - defaultMillis(LEADER_CHECK_TIMEOUT_SETTING) - // then wait for the follower to check the leader - + defaultMillis(LEADER_CHECK_INTERVAL_SETTING) - // then wait for the exception response - + DEFAULT_DELAY_VARIABILITY - // then wait for a new election + boolean followersGetDisconnectEvent = randomBoolean(); + if (followersGetDisconnectEvent) { + logger.info("--> followers get disconnect event for leader {} ", originalLeader); + cluster.getAllNodesExcept(originalLeader).forEach(cn -> cn.onDisconnectEventFrom(originalLeader)); + // turn leader into candidate, which stabilisation asserts at the end + cluster.getAllNodesExcept(originalLeader).forEach(cn -> originalLeader.onDisconnectEventFrom(cn)); + cluster.stabilise(DEFAULT_DELAY_VARIABILITY // disconnect is scheduled + DEFAULT_ELECTION_DELAY - // then wait for the old leader's removal to be committed - + DEFAULT_CLUSTER_STATE_UPDATE_DELAY, - - // ALSO the leader may have just sent a follower check, which receives no response - // TODO unnecessary if notified of disconnection - defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING) - // wait for the leader to check its followers - + defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) - // then wait for the exception response - + DEFAULT_DELAY_VARIABILITY - // then wait for the removal to be committed - + DEFAULT_CLUSTER_STATE_UPDATE_DELAY - )); - + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + } else { + cluster.stabilise(Math.max( + // Each follower may have just sent a leader check, which receives no response + defaultMillis(LEADER_CHECK_TIMEOUT_SETTING) + // then wait for the follower to check the leader + + defaultMillis(LEADER_CHECK_INTERVAL_SETTING) + // then wait for the exception response + + DEFAULT_DELAY_VARIABILITY + // then wait for a new election + + DEFAULT_ELECTION_DELAY + // then wait for the old leader's removal to be committed + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY, + + // ALSO the leader may have just sent a follower check, which receives no response + defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING) + // wait for the leader to check its followers + + defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + // then wait for the exception response + + DEFAULT_DELAY_VARIABILITY + // then wait for the removal to be committed + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY + )); + } assertThat(cluster.getAnyLeader().getId(), not(equalTo(originalLeader.getId()))); } @@ -231,26 +238,32 @@ public void testFollowerDisconnectionDetectedQuickly() { final ClusterNode follower = cluster.getAnyNodeExcept(leader); logger.info("--> disconnecting follower {}", follower); follower.disconnect(); - - cluster.stabilise(Math.max( - // the leader may have just sent a follower check, which receives no response - // TODO unnecessary if notified of disconnection - defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING) - // wait for the leader to check the follower - + defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) - // then wait for the exception response - + DEFAULT_DELAY_VARIABILITY - // then wait for the removal to be committed - + DEFAULT_CLUSTER_STATE_UPDATE_DELAY, - - // ALSO the follower may have just sent a leader check, which receives no response - // TODO not necessary if notified of disconnection - defaultMillis(LEADER_CHECK_TIMEOUT_SETTING) - // then wait for the follower to check the leader - + defaultMillis(LEADER_CHECK_INTERVAL_SETTING) - // then wait for the exception response, causing the follower to become a candidate - + DEFAULT_DELAY_VARIABILITY - )); + boolean leaderGetsDisconnectEvent = randomBoolean(); + if (leaderGetsDisconnectEvent) { + logger.info("--> leader {} and follower {} get disconnect event", leader, follower); + leader.onDisconnectEventFrom(follower); + follower.onDisconnectEventFrom(leader); // to turn follower into candidate, which stabilisation asserts at the end + cluster.stabilise(DEFAULT_DELAY_VARIABILITY // disconnect is scheduled + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + } else { + cluster.stabilise(Math.max( + // the leader may have just sent a follower check, which receives no response + defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING) + // wait for the leader to check the follower + + defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + // then wait for the exception response + + DEFAULT_DELAY_VARIABILITY + // then wait for the removal to be committed + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY, + + // ALSO the follower may have just sent a leader check, which receives no response + defaultMillis(LEADER_CHECK_TIMEOUT_SETTING) + // then wait for the follower to check the leader + + defaultMillis(LEADER_CHECK_INTERVAL_SETTING) + // then wait for the exception response, causing the follower to become a candidate + + DEFAULT_DELAY_VARIABILITY + )); + } assertThat(cluster.getAnyLeader().getId(), equalTo(leader.getId())); } @@ -372,25 +385,25 @@ public void testAckListenerReceivesNacksIfPublicationTimesOut() { } public void testAckListenerReceivesNacksIfLeaderStandsDown() { - // TODO: needs support for handling disconnects -// final Cluster cluster = new Cluster(3); -// cluster.runRandomly(); -// cluster.stabilise(); -// final ClusterNode leader = cluster.getAnyLeader(); -// final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); -// final ClusterNode follower1 = cluster.getAnyNodeExcept(leader, follower0); -// -// leader.partition(); -// follower0.coordinator.handleDisconnectedNode(leader.localNode); -// follower1.coordinator.handleDisconnectedNode(leader.localNode); -// cluster.runUntil(cluster.getCurrentTimeMillis() + cluster.DEFAULT_ELECTION_TIME); -// AckCollector ackCollector = leader.submitRandomValue(); -// cluster.runUntil(cluster.currentTimeMillis + Cluster.DEFAULT_DELAY_VARIABILITY); -// leader.connectionStatus = ConnectionStatus.CONNECTED; -// cluster.stabilise(cluster.DEFAULT_STABILISATION_TIME, 0L); -// assertTrue("expected nack from " + leader, ackCollector.hasAckedUnsuccessfully(leader)); -// assertTrue("expected nack from " + follower0, ackCollector.hasAckedUnsuccessfully(follower0)); -// assertTrue("expected nack from " + follower1, ackCollector.hasAckedUnsuccessfully(follower1)); + final Cluster cluster = new Cluster(3); + cluster.runRandomly(); + cluster.stabilise(); + final ClusterNode leader = cluster.getAnyLeader(); + final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); + final ClusterNode follower1 = cluster.getAnyNodeExcept(leader, follower0); + + leader.blackhole(); + follower0.onDisconnectEventFrom(leader); + follower1.onDisconnectEventFrom(leader); + cluster.runFor(DEFAULT_DELAY_VARIABILITY // disconnect is scheduled + + DEFAULT_ELECTION_DELAY, "elect new leader"); + leader.heal(); + AckCollector ackCollector = leader.submitValue(randomLong()); + cluster.runFor(DEFAULT_DELAY_VARIABILITY, "start publishing"); + cluster.stabilise(); + assertTrue("expected nack from " + leader, ackCollector.hasAckedUnsuccessfully(leader)); + assertTrue("expected nack from " + follower0, ackCollector.hasAckedUnsuccessfully(follower0)); + assertTrue("expected nack from " + follower1, ackCollector.hasAckedUnsuccessfully(follower1)); } public void testAckListenerReceivesNacksFromFollowerInHigherTerm() { @@ -814,11 +827,15 @@ ClusterNode getAnyNode() { } ClusterNode getAnyNodeExcept(ClusterNode... clusterNodes) { + return randomFrom(getAllNodesExcept(clusterNodes)); + } + + List getAllNodesExcept(ClusterNode... clusterNodes) { Set forbiddenIds = Arrays.stream(clusterNodes).map(ClusterNode::getId).collect(Collectors.toSet()); List acceptableNodes = this.clusterNodes.stream().filter(n -> forbiddenIds.contains(n.getId()) == false).collect(Collectors.toList()); assert acceptableNodes.isEmpty() == false; - return randomFrom(acceptableNodes); + return acceptableNodes; } ClusterNode getAnyNodePreferringLeaders() { @@ -1031,6 +1048,10 @@ boolean blackhole() { return blackholed; } + void onDisconnectEventFrom(ClusterNode clusterNode) { + transportService.disconnectFromNode(clusterNode.localNode); + } + ClusterState getLastAppliedClusterState() { return clusterApplier.lastAppliedClusterState; } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java index 452ff3cdf8e37..c3f2c6abec0e3 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.coordination.Coordinator.Mode; import org.elasticsearch.cluster.coordination.FollowersChecker.FollowerCheckRequest; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -51,6 +52,7 @@ import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING; import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; @@ -93,7 +95,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req final FollowersChecker followersChecker = new FollowersChecker(settings, transportService, fcr -> { assert false : fcr; - }, node -> { + }, (node, reason) -> { assert false : node; }); @@ -215,12 +217,63 @@ public Empty get() { * FOLLOWER_CHECK_INTERVAL_SETTING.get(settings).millis()); } - public void testFailsNodeThatDisconnects() { + public void testFailsNodeThatIsDisconnected() { testBehaviourOfFailingNode(Settings.EMPTY, () -> { throw new ConnectTransportException(null, "simulated exception"); }, 0); } + public void testFailsNodeThatDisconnects() { + final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT); + final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT); + final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getName()).build(); + final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random()); + + final MockTransport mockTransport = new MockTransport() { + @Override + protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { + assertFalse(node.equals(localNode)); + if (action.equals(HANDSHAKE_ACTION_NAME)) { + handleResponse(requestId, new TransportService.HandshakeResponse(node, ClusterName.DEFAULT, Version.CURRENT)); + return; + } + deterministicTaskQueue.scheduleNow(new Runnable() { + @Override + public void run() { + handleResponse(requestId, Empty.INSTANCE); + } + + @Override + public String toString() { + return "sending response to [" + action + "][" + requestId + "] from " + node; + } + }); + } + }; + + final TransportService transportService = mockTransport.createTransportService(settings, deterministicTaskQueue.getThreadPool(), + TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet()); + transportService.start(); + transportService.acceptIncomingRequests(); + + final AtomicBoolean nodeFailed = new AtomicBoolean(); + + final FollowersChecker followersChecker = new FollowersChecker(settings, transportService, fcr -> { + assert false : fcr; + }, (node, reason) -> { + assertTrue(nodeFailed.compareAndSet(false, true)); + }); + + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).build(); + followersChecker.setCurrentNodes(discoveryNodes); + + transportService.connectToNode(otherNode); + transportService.disconnectFromNode(otherNode); + deterministicTaskQueue.runAllRunnableTasks(); + assertTrue(nodeFailed.get()); + assertThat(followersChecker.getFaultyNodes(), contains(otherNode)); + } + private void testBehaviourOfFailingNode(Settings testSettings, Supplier responder, long expectedFailureTime) { final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT); final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT); @@ -266,7 +319,7 @@ public String toString() { final FollowersChecker followersChecker = new FollowersChecker(settings, transportService, fcr -> { assert false : fcr; - }, node -> { + }, (node, reason) -> { assertTrue(nodeFailed.compareAndSet(false, true)); }); @@ -357,7 +410,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req if (exception != null) { throw exception; } - }, node -> { + }, (node, reason) -> { assert false : node; }); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java index 60702c9eab119..afe34e5c16169 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java @@ -21,10 +21,10 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.coordination.LeaderChecker.LeaderCheckRequest; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.EqualsHashCodeTestUtils; @@ -47,6 +47,7 @@ import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING; import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME; import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; @@ -145,7 +146,8 @@ public String toString() { () -> assertTrue(leaderFailed.compareAndSet(false, true))); logger.info("--> creating first checker"); - try (Releasable ignored = leaderChecker.startLeaderChecker(leader1)) { + leaderChecker.updateLeader(leader1); + { final long maxCheckCount = randomLongBetween(2, 1000); logger.info("--> checking that no failure is detected in {} checks", maxCheckCount); while (checkCount.get() < maxCheckCount) { @@ -153,13 +155,15 @@ public String toString() { deterministicTaskQueue.advanceTime(); } } + leaderChecker.updateLeader(null); logger.info("--> running remaining tasks"); deterministicTaskQueue.runAllTasks(); assertFalse(leaderFailed.get()); logger.info("--> creating second checker"); - try (Releasable ignored = leaderChecker.startLeaderChecker(leader2)) { + leaderChecker.updateLeader(leader2); + { checkCount.set(0); final long maxCheckCount = randomLongBetween(2, 1000); logger.info("--> checking again that no failure is detected in {} checks", maxCheckCount); @@ -184,6 +188,7 @@ public String toString() { + leaderCheckTimeoutMillis // needed because a successful check response might be in flight at the time of failure )); } + leaderChecker.updateLeader(null); } enum Response { @@ -201,6 +206,10 @@ public void testFollowerFailsImmediatelyOnDisconnection() { final MockTransport mockTransport = new MockTransport() { @Override protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { + if (action.equals(HANDSHAKE_ACTION_NAME)) { + handleResponse(requestId, new TransportService.HandshakeResponse(node, ClusterName.DEFAULT, Version.CURRENT)); + return; + } assertThat(action, equalTo(LEADER_CHECK_ACTION_NAME)); assertTrue(node.equals(leader)); final Response response = responseHolder[0]; @@ -237,7 +246,8 @@ public String toString() { final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService, () -> assertTrue(leaderFailed.compareAndSet(false, true))); - try (Releasable ignored = leaderChecker.startLeaderChecker(leader)) { + leaderChecker.updateLeader(leader); + { while (deterministicTaskQueue.getCurrentTimeMillis() < 10 * LEADER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis()) { deterministicTaskQueue.runAllRunnableTasks(); deterministicTaskQueue.advanceTime(); @@ -253,12 +263,14 @@ public String toString() { assertTrue(leaderFailed.get()); } + leaderChecker.updateLeader(null); deterministicTaskQueue.runAllTasks(); leaderFailed.set(false); responseHolder[0] = Response.SUCCESS; - try (Releasable ignored = leaderChecker.startLeaderChecker(leader)) { + leaderChecker.updateLeader(leader); + { while (deterministicTaskQueue.getCurrentTimeMillis() < 10 * LEADER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis()) { deterministicTaskQueue.runAllRunnableTasks(); deterministicTaskQueue.advanceTime(); @@ -274,6 +286,19 @@ public String toString() { assertTrue(leaderFailed.get()); } + + deterministicTaskQueue.runAllTasks(); + leaderFailed.set(false); + responseHolder[0] = Response.SUCCESS; + + leaderChecker.updateLeader(leader); + { + transportService.connectToNode(leader); // need to connect first for disconnect to have any effect + + transportService.disconnectFromNode(leader); + deterministicTaskQueue.runAllRunnableTasks(); + assertTrue(leaderFailed.get()); + } } public void testLeaderBehaviour() { diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java index d642798d688ae..184c8294d5813 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java @@ -19,7 +19,6 @@ package org.elasticsearch.test.transport; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Randomness; @@ -34,6 +33,7 @@ import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.CloseableConnection; import org.elasticsearch.transport.ConnectionManager; import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.RemoteTransportException; @@ -158,7 +158,7 @@ public void handleError(final long requestId, final TransportException e) { @Override public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) { - return new Connection() { + return new CloseableConnection() { @Override public DiscoveryNode getNode() { return node; @@ -170,19 +170,6 @@ public void sendRequest(long requestId, String action, TransportRequest request, requests.put(requestId, Tuple.tuple(node, action)); onSendRequest(requestId, action, request, node); } - - @Override - public void addCloseListener(ActionListener listener) { - } - - @Override - public boolean isClosed() { - return false; - } - - @Override - public void close() { - } }; } From b0df401c076f074825b66fd70b1b73e7159717c8 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 16 Oct 2018 16:54:01 +0200 Subject: [PATCH 2/9] redundant --- .../org/elasticsearch/cluster/coordination/LeaderChecker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java index 60fa88f5acc90..2ed9e5de918e0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java @@ -112,7 +112,7 @@ public DiscoveryNode leader() { * @param leader the node to be checked as leader, or null if checks should be disabled */ public void updateLeader(@Nullable final DiscoveryNode leader) { - assert leader == null || transportService.getLocalNode().equals(leader) == false; + assert transportService.getLocalNode().equals(leader) == false; final CheckScheduler checkScheduler; if (leader != null) { checkScheduler = new CheckScheduler(leader); From 45187229ceb81b1c079ea6681136ff605381dc7b Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 16 Oct 2018 16:57:10 +0200 Subject: [PATCH 3/9] they pay me by line of code --- .../coordination/CoordinatorTests.java | 133 ++++++++++-------- 1 file changed, 74 insertions(+), 59 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 1b922e1e025a7..019f2af5bd408 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -144,7 +144,7 @@ public void testNodesJoinAfterStableCluster() { assertEquals(currentTerm, newTerm); } - public void testLeaderDisconnectionDetectedQuickly() { + public void testLeaderDisconnectionWithDisconnectEventDetectedQuickly() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); cluster.runRandomly(); cluster.stabilise(); @@ -152,38 +152,45 @@ public void testLeaderDisconnectionDetectedQuickly() { final ClusterNode originalLeader = cluster.getAnyLeader(); logger.info("--> disconnecting leader {}", originalLeader); originalLeader.disconnect(); - boolean followersGetDisconnectEvent = randomBoolean(); - if (followersGetDisconnectEvent) { - logger.info("--> followers get disconnect event for leader {} ", originalLeader); - cluster.getAllNodesExcept(originalLeader).forEach(cn -> cn.onDisconnectEventFrom(originalLeader)); - // turn leader into candidate, which stabilisation asserts at the end - cluster.getAllNodesExcept(originalLeader).forEach(cn -> originalLeader.onDisconnectEventFrom(cn)); - cluster.stabilise(DEFAULT_DELAY_VARIABILITY // disconnect is scheduled + logger.info("--> followers get disconnect event for leader {} ", originalLeader); + cluster.getAllNodesExcept(originalLeader).forEach(cn -> cn.onDisconnectEventFrom(originalLeader)); + // turn leader into candidate, which stabilisation asserts at the end + cluster.getAllNodesExcept(originalLeader).forEach(cn -> originalLeader.onDisconnectEventFrom(cn)); + cluster.stabilise(DEFAULT_DELAY_VARIABILITY // disconnect is scheduled + + DEFAULT_ELECTION_DELAY + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + assertThat(cluster.getAnyLeader().getId(), not(equalTo(originalLeader.getId()))); + } + + public void testLeaderDisconnectionWithoutDisconnectEventDetectedQuickly() { + final Cluster cluster = new Cluster(randomIntBetween(3, 5)); + cluster.runRandomly(); + cluster.stabilise(); + + final ClusterNode originalLeader = cluster.getAnyLeader(); + logger.info("--> disconnecting leader {}", originalLeader); + originalLeader.disconnect(); + cluster.stabilise(Math.max( + // Each follower may have just sent a leader check, which receives no response + defaultMillis(LEADER_CHECK_TIMEOUT_SETTING) + // then wait for the follower to check the leader + + defaultMillis(LEADER_CHECK_INTERVAL_SETTING) + // then wait for the exception response + + DEFAULT_DELAY_VARIABILITY + // then wait for a new election + DEFAULT_ELECTION_DELAY - + DEFAULT_CLUSTER_STATE_UPDATE_DELAY); - } else { - cluster.stabilise(Math.max( - // Each follower may have just sent a leader check, which receives no response - defaultMillis(LEADER_CHECK_TIMEOUT_SETTING) - // then wait for the follower to check the leader - + defaultMillis(LEADER_CHECK_INTERVAL_SETTING) - // then wait for the exception response - + DEFAULT_DELAY_VARIABILITY - // then wait for a new election - + DEFAULT_ELECTION_DELAY - // then wait for the old leader's removal to be committed - + DEFAULT_CLUSTER_STATE_UPDATE_DELAY, - - // ALSO the leader may have just sent a follower check, which receives no response - defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING) - // wait for the leader to check its followers - + defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) - // then wait for the exception response - + DEFAULT_DELAY_VARIABILITY - // then wait for the removal to be committed - + DEFAULT_CLUSTER_STATE_UPDATE_DELAY - )); - } + // then wait for the old leader's removal to be committed + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY, + + // ALSO the leader may have just sent a follower check, which receives no response + defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING) + // wait for the leader to check its followers + + defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + // then wait for the exception response + + DEFAULT_DELAY_VARIABILITY + // then wait for the removal to be committed + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY + )); assertThat(cluster.getAnyLeader().getId(), not(equalTo(originalLeader.getId()))); } @@ -229,7 +236,7 @@ public void testUnresponsiveLeaderDetectedEventually() { assertThat(cluster.getAnyLeader().getId(), not(equalTo(originalLeader.getId()))); } - public void testFollowerDisconnectionDetectedQuickly() { + public void testFollowerDisconnectionWithDisconnectEventDetectedQuickly() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); cluster.runRandomly(); cluster.stabilise(); @@ -238,32 +245,40 @@ public void testFollowerDisconnectionDetectedQuickly() { final ClusterNode follower = cluster.getAnyNodeExcept(leader); logger.info("--> disconnecting follower {}", follower); follower.disconnect(); - boolean leaderGetsDisconnectEvent = randomBoolean(); - if (leaderGetsDisconnectEvent) { - logger.info("--> leader {} and follower {} get disconnect event", leader, follower); - leader.onDisconnectEventFrom(follower); - follower.onDisconnectEventFrom(leader); // to turn follower into candidate, which stabilisation asserts at the end - cluster.stabilise(DEFAULT_DELAY_VARIABILITY // disconnect is scheduled - + DEFAULT_CLUSTER_STATE_UPDATE_DELAY); - } else { - cluster.stabilise(Math.max( - // the leader may have just sent a follower check, which receives no response - defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING) - // wait for the leader to check the follower - + defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) - // then wait for the exception response - + DEFAULT_DELAY_VARIABILITY - // then wait for the removal to be committed - + DEFAULT_CLUSTER_STATE_UPDATE_DELAY, - - // ALSO the follower may have just sent a leader check, which receives no response - defaultMillis(LEADER_CHECK_TIMEOUT_SETTING) - // then wait for the follower to check the leader - + defaultMillis(LEADER_CHECK_INTERVAL_SETTING) - // then wait for the exception response, causing the follower to become a candidate - + DEFAULT_DELAY_VARIABILITY - )); - } + logger.info("--> leader {} and follower {} get disconnect event", leader, follower); + leader.onDisconnectEventFrom(follower); + follower.onDisconnectEventFrom(leader); // to turn follower into candidate, which stabilisation asserts at the end + cluster.stabilise(DEFAULT_DELAY_VARIABILITY // disconnect is scheduled + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + assertThat(cluster.getAnyLeader().getId(), equalTo(leader.getId())); + } + + public void testFollowerDisconnectionWithoutDisconnectEventDetectedQuickly() { + final Cluster cluster = new Cluster(randomIntBetween(3, 5)); + cluster.runRandomly(); + cluster.stabilise(); + + final ClusterNode leader = cluster.getAnyLeader(); + final ClusterNode follower = cluster.getAnyNodeExcept(leader); + logger.info("--> disconnecting follower {}", follower); + follower.disconnect(); + cluster.stabilise(Math.max( + // the leader may have just sent a follower check, which receives no response + defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING) + // wait for the leader to check the follower + + defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + // then wait for the exception response + + DEFAULT_DELAY_VARIABILITY + // then wait for the removal to be committed + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY, + + // ALSO the follower may have just sent a leader check, which receives no response + defaultMillis(LEADER_CHECK_TIMEOUT_SETTING) + // then wait for the follower to check the leader + + defaultMillis(LEADER_CHECK_INTERVAL_SETTING) + // then wait for the exception response, causing the follower to become a candidate + + DEFAULT_DELAY_VARIABILITY + )); assertThat(cluster.getAnyLeader().getId(), equalTo(leader.getId())); } From f196a2a723ac98ede0573f86e97dbde5009f6023 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 16 Oct 2018 17:18:47 +0200 Subject: [PATCH 4/9] add proper reason when removing nodes --- .../cluster/coordination/FollowersChecker.java | 11 +++++++---- .../cluster/coordination/FollowersCheckerTests.java | 10 ++++++++-- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java index 43f4a3c054276..68dcecede7e05 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java @@ -240,7 +240,7 @@ private void handleDisconnectedNode(DiscoveryNode discoveryNode) { synchronized (mutex) { FollowerChecker followerChecker = followerCheckers.get(discoveryNode); if (followerChecker != null && followerChecker.running()) { - followerChecker.failNode(); + followerChecker.failNode("disconnected"); } } } @@ -320,18 +320,21 @@ public void handleException(TransportException exp) { failureCountSinceLastSuccess++; + final String reason; if (failureCountSinceLastSuccess >= followerCheckRetryCount) { logger.debug(() -> new ParameterizedMessage("{} failed too many times", FollowerChecker.this), exp); + reason = "followers check retry count exceeded"; } else if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) { logger.debug(() -> new ParameterizedMessage("{} disconnected", FollowerChecker.this), exp); + reason = "disconnected"; } else { logger.debug(() -> new ParameterizedMessage("{} failed, retrying", FollowerChecker.this), exp); scheduleNextWakeUp(); return; } - failNode(); + failNode(reason); } @@ -342,7 +345,7 @@ public String executor() { }); } - void failNode() { + void failNode(String reason) { transportService.getThreadPool().generic().execute(new Runnable() { @Override public void run() { @@ -354,7 +357,7 @@ public void run() { faultyNodes.add(discoveryNode); followerCheckers.remove(discoveryNode); } - onNodeFailure.accept(discoveryNode, "followers_checker"); + onNodeFailure.accept(discoveryNode, reason); } @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java index c3f2c6abec0e3..08bd6be381691 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java @@ -165,6 +165,7 @@ public void testFailsNodeThatDoesNotRespond() { final Settings settings = settingsBuilder.build(); testBehaviourOfFailingNode(settings, () -> null, + "followers check retry count exceeded", (FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) - 1) * FOLLOWER_CHECK_INTERVAL_SETTING.get(settings).millis() + FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) * FOLLOWER_CHECK_TIMEOUT_SETTING.get(settings).millis()); } @@ -182,6 +183,7 @@ public void testFailsNodeThatRejectsCheck() { testBehaviourOfFailingNode(settings, () -> { throw new ElasticsearchException("simulated exception"); }, + "followers check retry count exceeded", (FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) - 1) * FOLLOWER_CHECK_INTERVAL_SETTING.get(settings).millis()); } @@ -213,6 +215,7 @@ public Empty get() { throw new ElasticsearchException("simulated exception"); } }, + "followers check retry count exceeded", (FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) * (maxRecoveries + 1) - 1) * FOLLOWER_CHECK_INTERVAL_SETTING.get(settings).millis()); } @@ -220,7 +223,7 @@ public Empty get() { public void testFailsNodeThatIsDisconnected() { testBehaviourOfFailingNode(Settings.EMPTY, () -> { throw new ConnectTransportException(null, "simulated exception"); - }, 0); + }, "disconnected", 0); } public void testFailsNodeThatDisconnects() { @@ -262,6 +265,7 @@ public String toString() { assert false : fcr; }, (node, reason) -> { assertTrue(nodeFailed.compareAndSet(false, true)); + assertThat(reason, equalTo("disconnected")); }); DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).build(); @@ -274,7 +278,8 @@ public String toString() { assertThat(followersChecker.getFaultyNodes(), contains(otherNode)); } - private void testBehaviourOfFailingNode(Settings testSettings, Supplier responder, long expectedFailureTime) { + private void testBehaviourOfFailingNode(Settings testSettings, Supplier responder, String failureReason, + long expectedFailureTime) { final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT); final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT); final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getName()).put(testSettings).build(); @@ -321,6 +326,7 @@ public String toString() { assert false : fcr; }, (node, reason) -> { assertTrue(nodeFailed.compareAndSet(false, true)); + assertThat(reason, equalTo(failureReason)); }); DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).build(); From 65890273424ab0000f50ee5a27d739fca9540420 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 16 Oct 2018 17:21:46 +0200 Subject: [PATCH 5/9] move assertion --- .../elasticsearch/cluster/coordination/CoordinatorTests.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 019f2af5bd408..334b5d2f88432 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -842,14 +842,15 @@ ClusterNode getAnyNode() { } ClusterNode getAnyNodeExcept(ClusterNode... clusterNodes) { - return randomFrom(getAllNodesExcept(clusterNodes)); + List filteredNodes = getAllNodesExcept(clusterNodes); + assert filteredNodes.isEmpty() == false; + return randomFrom(filteredNodes); } List getAllNodesExcept(ClusterNode... clusterNodes) { Set forbiddenIds = Arrays.stream(clusterNodes).map(ClusterNode::getId).collect(Collectors.toSet()); List acceptableNodes = this.clusterNodes.stream().filter(n -> forbiddenIds.contains(n.getId()) == false).collect(Collectors.toList()); - assert acceptableNodes.isEmpty() == false; return acceptableNodes; } From cb123e516aeab05efdecdbda224087f19dab5803 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 16 Oct 2018 17:23:33 +0200 Subject: [PATCH 6/9] fold into stabilize --- .../org/elasticsearch/cluster/coordination/CoordinatorTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 334b5d2f88432..833b7a444d896 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -414,7 +414,6 @@ public void testAckListenerReceivesNacksIfLeaderStandsDown() { + DEFAULT_ELECTION_DELAY, "elect new leader"); leader.heal(); AckCollector ackCollector = leader.submitValue(randomLong()); - cluster.runFor(DEFAULT_DELAY_VARIABILITY, "start publishing"); cluster.stabilise(); assertTrue("expected nack from " + leader, ackCollector.hasAckedUnsuccessfully(leader)); assertTrue("expected nack from " + follower0, ackCollector.hasAckedUnsuccessfully(follower0)); From 8022cb945a7d0f704acfecce4ee705dcebe97d8f Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 16 Oct 2018 17:36:49 +0200 Subject: [PATCH 7/9] add comment --- .../org/elasticsearch/cluster/coordination/CoordinatorTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 833b7a444d896..9e494f934aee3 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -410,6 +410,7 @@ public void testAckListenerReceivesNacksIfLeaderStandsDown() { leader.blackhole(); follower0.onDisconnectEventFrom(leader); follower1.onDisconnectEventFrom(leader); + // let followers elect a leader among themselves before healing the leader and running the publication cluster.runFor(DEFAULT_DELAY_VARIABILITY // disconnect is scheduled + DEFAULT_ELECTION_DELAY, "elect new leader"); leader.heal(); From 7781a4494135c653ea10d464bc70231569dccdc9 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 22 Oct 2018 10:03:32 +0200 Subject: [PATCH 8/9] wait for reconfiguration --- .../elasticsearch/cluster/coordination/CoordinatorTests.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 0b1c71f9262d8..734bd6daf0551 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -502,6 +502,8 @@ public void testFollowerDisconnectionDetectedQuickly() { leader.onDisconnectEventFrom(follower); follower.onDisconnectEventFrom(leader); // to turn follower into candidate, which stabilisation asserts at the end cluster.stabilise(DEFAULT_DELAY_VARIABILITY // disconnect is scheduled + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY + // then wait for the followup reconfiguration + DEFAULT_CLUSTER_STATE_UPDATE_DELAY); assertThat(cluster.getAnyLeader().getId(), equalTo(leader.getId())); } From 8d2c394800429bc43dbf5de24b05e63b2429c20d Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 22 Oct 2018 13:52:19 +0200 Subject: [PATCH 9/9] add comments --- .../elasticsearch/cluster/coordination/CoordinatorTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 734bd6daf0551..84ba9d06e362c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -676,9 +676,10 @@ public void testAckListenerReceivesNacksIfLeaderStandsDown() { // let followers elect a leader among themselves before healing the leader and running the publication cluster.runFor(DEFAULT_DELAY_VARIABILITY // disconnect is scheduled + DEFAULT_ELECTION_DELAY, "elect new leader"); + // cluster has two nodes in mode LEADER, in different terms ofc, and the one in the lower term won’t be able to publish anything leader.heal(); AckCollector ackCollector = leader.submitValue(randomLong()); - cluster.stabilise(); + cluster.stabilise(); // TODO: check if can find a better bound here assertTrue("expected nack from " + leader, ackCollector.hasAckedUnsuccessfully(leader)); assertTrue("expected nack from " + follower0, ackCollector.hasAckedUnsuccessfully(follower0)); assertTrue("expected nack from " + follower1, ackCollector.hasAckedUnsuccessfully(follower1));