From aa41f6e1092b0808e9a1b0950c35292815474742 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 2 Oct 2018 14:56:40 +0100 Subject: [PATCH 01/32] Introduce runRandomly --- .../coordination/CoordinatorTests.java | 123 +++++++++++++++++- .../coordination/DeterministicTaskQueue.java | 7 + .../DeterministicTaskQueueTests.java | 2 + 3 files changed, 130 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 4b72b029dfc7d..612ca92f89666 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -52,8 +52,10 @@ import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.function.Predicate; @@ -65,6 +67,7 @@ import static org.elasticsearch.cluster.coordination.CoordinationStateTests.value; import static org.elasticsearch.cluster.coordination.Coordinator.Mode.CANDIDATE; import static org.elasticsearch.cluster.coordination.Coordinator.Mode.FOLLOWER; +import static org.elasticsearch.cluster.coordination.Coordinator.Mode.LEADER; import static org.elasticsearch.cluster.coordination.CoordinatorTests.Cluster.DEFAULT_DELAY_VARIABILITY; import static org.elasticsearch.cluster.coordination.ElectionSchedulerFactory.ELECTION_BACK_OFF_TIME_SETTING; import static org.elasticsearch.cluster.coordination.ElectionSchedulerFactory.ELECTION_INITIAL_TIMEOUT_SETTING; @@ -88,6 +91,7 @@ public class CoordinatorTests extends ESTestCase { public void testCanUpdateClusterStateAfterStabilisation() { final Cluster cluster = new Cluster(randomIntBetween(1, 5)); + cluster.runRandomly(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); @@ -106,6 +110,7 @@ public void testCanUpdateClusterStateAfterStabilisation() { public void testNodesJoinAfterStableCluster() { final Cluster cluster = new Cluster(randomIntBetween(1, 5)); + cluster.runRandomly(); cluster.stabilise(); final long currentTerm = cluster.getAnyLeader().coordinator.getCurrentTerm(); @@ -125,6 +130,7 @@ public void testNodesJoinAfterStableCluster() { public void testLeaderDisconnectionDetectedQuickly() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); + cluster.runRandomly(); cluster.stabilise(); final ClusterNode originalLeader = cluster.getAnyLeader(); @@ -153,13 +159,14 @@ public void testLeaderDisconnectionDetectedQuickly() { + DEFAULT_DELAY_VARIABILITY // then wait for the removal to be committed + DEFAULT_CLUSTER_STATE_UPDATE_DELAY - )); + )); assertThat(cluster.getAnyLeader().getId(), not(equalTo(originalLeader.getId()))); } public void testUnresponsiveLeaderDetectedEventually() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); + cluster.runRandomly(); cluster.stabilise(); final ClusterNode originalLeader = cluster.getAnyLeader(); @@ -189,6 +196,7 @@ public void testUnresponsiveLeaderDetectedEventually() { public void testFollowerDisconnectionDetectedQuickly() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); + cluster.runRandomly(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); @@ -220,6 +228,7 @@ public void testFollowerDisconnectionDetectedQuickly() { public void testUnresponsiveFollowerDetectedEventually() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); + cluster.runRandomly(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); @@ -289,6 +298,7 @@ private static String nodeIdFromIndex(int nodeIndex) { class Cluster { + static final long EXTREME_DELAY_VARIABILITY = 10000L; static final long DEFAULT_DELAY_VARIABILITY = 100L; final List clusterNodes; @@ -299,6 +309,7 @@ class Cluster { private final Set disconnectedNodes = new HashSet<>(); private final Set blackholedNodes = new HashSet<>(); + private final Map committedStatesByVersion = new HashMap<>(); Cluster(int initialNodeCount) { deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY); @@ -328,6 +339,100 @@ void addNodes(int newNodesCount) { } } + void runRandomly() { + + final int randomSteps = scaledRandomIntBetween(10, 10000); + logger.info("--> start of safety phase of at least [{}] steps", randomSteps); + + deterministicTaskQueue.setExecutionDelayVariabilityMillis(EXTREME_DELAY_VARIABILITY); + int step = 0; + long finishTime = -1; + + while (finishTime == -1 || deterministicTaskQueue.getCurrentTimeMillis() <= finishTime) { + step++; + if (randomSteps <= step && finishTime == -1) { + finishTime = deterministicTaskQueue.getLatestDeferredExecutionTime(); + deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY); + logger.debug("----> [runRandomly {}] reducing delay variability and running until [{}]", step, finishTime); + } + + try { + if (rarely()) { + final ClusterNode clusterNode = getAnyNodePreferringLeaders(); + final int newValue = randomInt(); + logger.debug("----> [runRandomly {}] proposing new value [{}] to [{}]", step, newValue, clusterNode.getId()); + clusterNode.submitValue(newValue); + } else if (rarely()) { + final ClusterNode clusterNode = getAnyNode(); + logger.debug("----> [runRandomly {}] forcing {} to become candidate", step, clusterNode.getId()); + synchronized (clusterNode.coordinator.mutex) { + clusterNode.coordinator.becomeCandidate("runRandomly"); + } + } else if (rarely()) { + final ClusterNode clusterNode = getAnyNode(); + final String id = clusterNode.getId(); + disconnectedNodes.remove(id); + blackholedNodes.remove(id); + + switch (randomInt(2)) { + case 0: + logger.debug("----> [runRandomly {}] connecting", step, id); + break; + case 1: + logger.debug("----> [runRandomly {}] disconnecting", step, id); + disconnectedNodes.add(id); + break; + case 2: + logger.debug("----> [runRandomly {}] blackholing", step, id); + blackholedNodes.add(id); + break; + } + } else { + if (deterministicTaskQueue.hasDeferredTasks() && randomBoolean()) { + deterministicTaskQueue.advanceTime(); + } else if (deterministicTaskQueue.hasRunnableTasks()) { + deterministicTaskQueue.runRandomTask(); + } + } + + // TODO other random steps: + // - reboot a node + // - abdicate leadership + // - bootstrap + + } catch (CoordinationStateRejectedException ignored) { + // This is ok: it just means a message couldn't currently be handled. + } + + assertConsistentStates(); + } + + disconnectedNodes.clear(); + blackholedNodes.clear(); + } + + private void assertConsistentStates() { + for (final ClusterNode clusterNode : clusterNodes) { + clusterNode.coordinator.invariant(); + } + updateCommittedStates(); + } + + private void updateCommittedStates() { + for (final ClusterNode clusterNode : clusterNodes) { + Optional committedState = clusterNode.coordinator.getLastCommittedState(); + if (committedState.isPresent()) { + ClusterState storedState = committedStatesByVersion.get(committedState.get().getVersion()); + if (storedState == null) { + committedStatesByVersion.put(committedState.get().getVersion(), committedState.get()); + } else { + assertEquals("expected " + committedState.get() + " but got " + storedState, + value(committedState.get()), value(storedState)); + } + } + } + } + void stabilise() { stabilise(DEFAULT_STABILISATION_TIME); } @@ -428,6 +533,10 @@ private ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNode return connectionStatus; } + ClusterNode getAnyNode() { + return getAnyNodeExcept(); + } + ClusterNode getAnyNodeExcept(ClusterNode... clusterNodes) { Set forbiddenIds = Arrays.stream(clusterNodes).map(ClusterNode::getId).collect(Collectors.toSet()); List acceptableNodes @@ -436,6 +545,16 @@ ClusterNode getAnyNodeExcept(ClusterNode... clusterNodes) { return randomFrom(acceptableNodes); } + ClusterNode getAnyNodePreferringLeaders() { + for (int i = 0; i < 3; i++) { + ClusterNode clusterNode = getAnyNode(); + if (clusterNode.coordinator.getMode() == LEADER) { + return clusterNode; + } + } + return getAnyNode(); + } + class ClusterNode extends AbstractComponent { private final int nodeIndex; private Coordinator coordinator; @@ -538,7 +657,7 @@ DiscoveryNode getLocalNode() { } boolean isLeader() { - return coordinator.getMode() == Coordinator.Mode.LEADER; + return coordinator.getMode() == LEADER; } void submitValue(final long value) { diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java index 2bebc1ab244c0..4e46137c2de55 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java @@ -50,6 +50,7 @@ public class DeterministicTaskQueue extends AbstractComponent { private long currentTimeMillis; private long nextDeferredTaskExecutionTimeMillis = Long.MAX_VALUE; private long executionDelayVariabilityMillis; + private long latestDeferredExecutionTime; public DeterministicTaskQueue(Settings settings, Random random) { super(settings); @@ -149,6 +150,7 @@ public void scheduleAt(final long executionTimeMillis, final Runnable task) { private void scheduleDeferredTask(DeferredTask deferredTask) { nextDeferredTaskExecutionTimeMillis = Math.min(nextDeferredTaskExecutionTimeMillis, deferredTask.getExecutionTimeMillis()); + latestDeferredExecutionTime = Math.max(latestDeferredExecutionTime, deferredTask.getExecutionTimeMillis()); deferredTasks.add(deferredTask); } @@ -161,6 +163,7 @@ public void advanceTime() { logger.trace("advanceTime: from [{}ms] to [{}ms]", currentTimeMillis, nextDeferredTaskExecutionTimeMillis); currentTimeMillis = nextDeferredTaskExecutionTimeMillis; + assert currentTimeMillis <= latestDeferredExecutionTime : latestDeferredExecutionTime + " < " + currentTimeMillis; nextDeferredTaskExecutionTimeMillis = Long.MAX_VALUE; List remainingDeferredTasks = new ArrayList<>(); @@ -418,6 +421,10 @@ public ScheduledExecutorService scheduler() { }; } + public long getLatestDeferredExecutionTime() { + return latestDeferredExecutionTime; + } + private static class DeferredTask { private final long executionTimeMillis; private final Runnable task; diff --git a/test/framework/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java b/test/framework/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java index 8c341a7710ef9..f3636460e55b6 100644 --- a/test/framework/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java +++ b/test/framework/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java @@ -366,10 +366,12 @@ public void testDelayVariabilityAppliesToFutureTasks() { for (int i = 0; i < 100; i++) { deterministicTaskQueue.scheduleAt(delayMillis, () -> {}); } + final long expectedEndTime = deterministicTaskQueue.getLatestDeferredExecutionTime(); final long startTime = deterministicTaskQueue.getCurrentTimeMillis(); deterministicTaskQueue.runAllTasks(); final long elapsedTime = deterministicTaskQueue.getCurrentTimeMillis() - startTime; + assertThat(deterministicTaskQueue.getCurrentTimeMillis(), is(expectedEndTime)); assertThat(elapsedTime, greaterThan(delayMillis)); // fails with negligible probability assertThat(elapsedTime, lessThanOrEqualTo(delayMillis + variabilityMillis)); } From 24b4a5490eb29bf7baba566a9ad74983f1f0d4a5 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 2 Oct 2018 15:29:25 +0100 Subject: [PATCH 02/32] Ignore publications from self if no longer leading --- .../cluster/coordination/Coordinator.java | 27 ++++++++++++------- .../coordination/PreVoteCollector.java | 2 ++ .../util/concurrent/ListenableFuture.java | 2 +- .../elasticsearch/discovery/PeerFinder.java | 4 +-- 4 files changed, 22 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 72e626399878e..9df382caa5537 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -60,6 +60,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; +import static org.elasticsearch.cluster.coordination.Coordinator.Mode.LEADER; + public class Coordinator extends AbstractLifecycleComponent implements Discovery { // the timeout for the publication of each value @@ -143,7 +145,7 @@ public String toString() { private void onFollowerFailure(DiscoveryNode discoveryNode) { synchronized (mutex) { - if (mode == Mode.LEADER) { + if (mode == LEADER) { masterService.submitStateUpdateTask("node-left", new NodeRemovalClusterStateTaskExecutor.Task(discoveryNode, "node left"), ClusterStateTaskConfig.build(Priority.IMMEDIATE), @@ -184,13 +186,18 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) { synchronized (mutex) { final DiscoveryNode sourceNode = publishRequest.getAcceptedState().nodes().getMasterNode(); logger.trace("handlePublishRequest: handling [{}] from [{}]", publishRequest, sourceNode); - ensureTermAtLeast(sourceNode, publishRequest.getAcceptedState().term()); - final PublishResponse publishResponse = coordinationState.get().handlePublishRequest(publishRequest); - if (sourceNode.equals(getLocalNode()) == false) { + if (sourceNode.equals(getLocalNode())) { + if (mode != LEADER || getCurrentTerm() != publishRequest.getAcceptedState().term()) { + throw new CoordinationStateRejectedException("no longer leading this publication's term: " + publishRequest); + } + } else { becomeFollower("handlePublishRequest", sourceNode); } + ensureTermAtLeast(sourceNode, publishRequest.getAcceptedState().term()); + final PublishResponse publishResponse = coordinationState.get().handlePublishRequest(publishRequest); + return new PublishWithJoinResponse(publishResponse, joinWithDestination(lastJoin, sourceNode, publishRequest.getAcceptedState().term())); } @@ -309,7 +316,7 @@ void becomeLeader(String method) { assert mode == Mode.CANDIDATE : "expected candidate but was " + mode; logger.debug("{}: becoming LEADER (was {}, lastKnownLeader was [{}])", method, mode, lastKnownLeader); - mode = Mode.LEADER; + mode = LEADER; joinAccumulator.close(mode); joinAccumulator = joinHelper.new LeaderJoinAccumulator(); @@ -419,7 +426,7 @@ public void invariant() { assert peerFinder.getCurrentTerm() == getCurrentTerm(); assert followersChecker.getFastResponseState().term == getCurrentTerm() : followersChecker.getFastResponseState(); assert followersChecker.getFastResponseState().mode == getMode() : followersChecker.getFastResponseState(); - if (mode == Mode.LEADER) { + if (mode == LEADER) { final boolean becomingMaster = getStateForMasterService().term() != getCurrentTerm(); assert coordinationState.get().electionWon(); @@ -509,7 +516,7 @@ ClusterState getStateForMasterService() { // expose last accepted cluster state as base state upon which the master service // speculatively calculates the next cluster state update final ClusterState clusterState = coordinationState.get().getLastAcceptedState(); - if (mode != Mode.LEADER || clusterState.term() != getCurrentTerm()) { + if (mode != LEADER || clusterState.term() != getCurrentTerm()) { // the master service checks if the local node is the master node in order to fail execution of the state update early return clusterStateWithNoMasterBlock(clusterState); } @@ -538,7 +545,7 @@ public void publish(ClusterChangedEvent clusterChangedEvent, ActionListener new ParameterizedMessage("[{}] failed publication as not currently leading", clusterChangedEvent.source())); publishListener.onFailure(new FailedToCommitClusterStateException("node stepped down as leader during publication")); @@ -600,7 +607,7 @@ protected void onCompletion(boolean committed) { currentPublication = Optional.empty(); updateMaxTermSeen(getCurrentTerm()); // triggers term bump if new term was found during publication - localNodeAckEvent.addListener(new ActionListener() { + localNodeAckEvent.addListener(wrapWithMutex(new ActionListener() { @Override public void onResponse(Void ignore) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; @@ -628,7 +635,7 @@ public void onFailure(Exception e) { ackListener.onNodeAck(getLocalNode(), exception); // other nodes have acked, but not the master. publishListener.onFailure(exception); } - }, transportService.getThreadPool().generic()); + }), transportService.getThreadPool().generic()); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java index c3417c17f8f53..a1064359f03c2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java @@ -200,6 +200,8 @@ public String toString() { ", electionStarted=" + electionStarted + ", preVoteRequest=" + preVoteRequest + ", isClosed=" + isClosed + + ", clusterState.term=" + clusterState.term() + + ", clusterState.version=" + clusterState.version() + '}'; } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java index 5fb8e9517b26e..971fa1f40acde 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java @@ -82,7 +82,7 @@ protected synchronized void done() { private void notifyListener(ActionListener listener, ExecutorService executorService) { try { - executorService.submit(new Runnable() { + executorService.execute(new Runnable() { @Override public void run() { try { diff --git a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java index 028519cc1383f..56ab8f6ce3bff 100644 --- a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java +++ b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java @@ -360,9 +360,9 @@ public void onFailure(Exception e) { }); } - private void removePeer() { + void removePeer() { final Peer removed = peersByAddress.remove(transportAddress); - assert removed == Peer.this; + assert removed == Peer.this : removed + " != " + Peer.this; } private void requestPeers() { From 29cab382b7d8dd8d747a58b354ca13da96149f71 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 2 Oct 2018 15:46:26 +0100 Subject: [PATCH 03/32] Must become follower after successfully processing publish request --- .../cluster/coordination/Coordinator.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 9df382caa5537..483620459e4a5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -187,17 +187,17 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) { final DiscoveryNode sourceNode = publishRequest.getAcceptedState().nodes().getMasterNode(); logger.trace("handlePublishRequest: handling [{}] from [{}]", publishRequest, sourceNode); - if (sourceNode.equals(getLocalNode())) { - if (mode != LEADER || getCurrentTerm() != publishRequest.getAcceptedState().term()) { - throw new CoordinationStateRejectedException("no longer leading this publication's term: " + publishRequest); - } - } else { - becomeFollower("handlePublishRequest", sourceNode); + if (sourceNode.equals(getLocalNode()) && (mode != LEADER || getCurrentTerm() != publishRequest.getAcceptedState().term())) { + throw new CoordinationStateRejectedException("no longer leading this publication's term: " + publishRequest); } ensureTermAtLeast(sourceNode, publishRequest.getAcceptedState().term()); final PublishResponse publishResponse = coordinationState.get().handlePublishRequest(publishRequest); + if (sourceNode.equals(getLocalNode()) == false) { + becomeFollower("handlePublishRequest", sourceNode); + } + return new PublishWithJoinResponse(publishResponse, joinWithDestination(lastJoin, sourceNode, publishRequest.getAcceptedState().term())); } From 289d39fb57e5a0aec218424fef85b2d1e9588aab Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 2 Oct 2018 15:46:40 +0100 Subject: [PATCH 04/32] Fix log messages --- .../cluster/coordination/CoordinatorTests.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 612ca92f89666..f773adb344fb8 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -376,14 +376,14 @@ void runRandomly() { switch (randomInt(2)) { case 0: - logger.debug("----> [runRandomly {}] connecting", step, id); + logger.debug("----> [runRandomly {}] connecting {}", step, id); break; case 1: - logger.debug("----> [runRandomly {}] disconnecting", step, id); + logger.debug("----> [runRandomly {}] disconnecting {}", step, id); disconnectedNodes.add(id); break; case 2: - logger.debug("----> [runRandomly {}] blackholing", step, id); + logger.debug("----> [runRandomly {}] blackholing {}", step, id); blackholedNodes.add(id); break; } From a5567845fb9d6ef979b250895c60085cb2f2601f Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 2 Oct 2018 15:46:50 +0100 Subject: [PATCH 05/32] Must guard peer removal --- .../main/java/org/elasticsearch/discovery/PeerFinder.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java index 56ab8f6ce3bff..34cf2a38997a2 100644 --- a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java +++ b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java @@ -355,6 +355,11 @@ public void onResponse(DiscoveryNode remoteNode) { @Override public void onFailure(Exception e) { logger.debug(() -> new ParameterizedMessage("{} connection failed", Peer.this), e); + synchronized (mutex) { + if (active == false) { + return; + } + } removePeer(); } }); From 91b03148201c30b446fee563f0234d030f84231e Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 2 Oct 2018 16:21:57 +0100 Subject: [PATCH 06/32] Timeout RequestPeersRequests --- .../org/elasticsearch/discovery/PeerFinder.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java index 34cf2a38997a2..859bdcec8a74e 100644 --- a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java +++ b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java @@ -36,6 +36,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; @@ -59,7 +60,12 @@ public abstract class PeerFinder extends AbstractComponent { Setting.timeSetting("discovery.find_peers_interval", TimeValue.timeValueMillis(1000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); - private final TimeValue findPeersDelay; + public static final Setting DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING = + Setting.timeSetting("discovery.request_peers_timeout", + TimeValue.timeValueMillis(10000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); + + private final TimeValue findPeersInterval; + private final TimeValue requestPeersTimeout; private final Object mutex = new Object(); private final TransportService transportService; @@ -75,7 +81,8 @@ public abstract class PeerFinder extends AbstractComponent { public PeerFinder(Settings settings, TransportService transportService, TransportAddressConnector transportAddressConnector, ConfiguredHostsResolver configuredHostsResolver) { super(settings); - findPeersDelay = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings); + findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings); + requestPeersTimeout = DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING.get(settings); this.transportService = transportService; this.transportAddressConnector = transportAddressConnector; this.configuredHostsResolver = configuredHostsResolver; @@ -241,7 +248,7 @@ private boolean handleWakeUp() { } }); - transportService.getThreadPool().schedule(findPeersDelay, Names.GENERIC, new AbstractRunnable() { + transportService.getThreadPool().schedule(findPeersInterval, Names.GENERIC, new AbstractRunnable() { @Override public boolean isForceExecution() { return true; @@ -385,6 +392,7 @@ private void requestPeers() { transportService.sendRequest(discoveryNode, REQUEST_PEERS_ACTION_NAME, new PeersRequest(getLocalNode(), knownNodes), + TransportRequestOptions.builder().withTimeout(requestPeersTimeout).build(), new TransportResponseHandler() { @Override From 19bd1ae73785bc97fdf223aa8d7637234a22547a Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 2 Oct 2018 16:58:55 +0100 Subject: [PATCH 07/32] Reset port counter more frequently --- .../cluster/coordination/CoordinatorTests.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index f773adb344fb8..4a22c6f775390 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -47,6 +47,7 @@ import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.transport.TransportService; import org.hamcrest.Matcher; +import org.junit.Before; import java.util.ArrayList; import java.util.Arrays; @@ -89,6 +90,11 @@ @TestLogging("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.discovery:TRACE") public class CoordinatorTests extends ESTestCase { + @Before + public void resetPortCounterBeforeEachTest() { + resetPortCounter(); + } + public void testCanUpdateClusterStateAfterStabilisation() { final Cluster cluster = new Cluster(randomIntBetween(1, 5)); cluster.runRandomly(); From 4d717b993ab656554cc1f006520b5208f60eb0e7 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 2 Oct 2018 16:59:21 +0100 Subject: [PATCH 08/32] Remove failing assertion for now --- .../src/main/java/org/elasticsearch/discovery/PeerFinder.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java index 859bdcec8a74e..0cb6e68e1d57e 100644 --- a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java +++ b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java @@ -374,7 +374,9 @@ public void onFailure(Exception e) { void removePeer() { final Peer removed = peersByAddress.remove(transportAddress); - assert removed == Peer.this : removed + " != " + Peer.this; + // assert removed == Peer.this : removed + " != " + Peer.this; + // ^ This assertion sometimes trips if we are deactivated and reactivated while a request is in flight. + // TODO be more careful about avoiding multiple active Peer objects for each address } private void requestPeers() { From 2b9883e18e828663cd2ebc10c9864e3382b6cd02 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 2 Oct 2018 17:18:34 +0100 Subject: [PATCH 09/32] Add comment describing why we reject a publication here --- .../org/elasticsearch/cluster/coordination/Coordinator.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 483620459e4a5..930209727ee4a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -188,6 +188,8 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) { logger.trace("handlePublishRequest: handling [{}] from [{}]", publishRequest, sourceNode); if (sourceNode.equals(getLocalNode()) && (mode != LEADER || getCurrentTerm() != publishRequest.getAcceptedState().term())) { + // Rare case in which we stood down as leader between starting this publication and receiving it ourselves. The publication + // is already failed so there is no point in proceeding. throw new CoordinationStateRejectedException("no longer leading this publication's term: " + publishRequest); } From c4ffe2274c13a0247684d901ed1ce4ace310d61a Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 2 Oct 2018 17:25:29 +0100 Subject: [PATCH 10/32] Re-qualify mode --- .../cluster/coordination/Coordinator.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 930209727ee4a..d47da4f77f552 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -60,8 +60,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; -import static org.elasticsearch.cluster.coordination.Coordinator.Mode.LEADER; - public class Coordinator extends AbstractLifecycleComponent implements Discovery { // the timeout for the publication of each value @@ -145,7 +143,7 @@ public String toString() { private void onFollowerFailure(DiscoveryNode discoveryNode) { synchronized (mutex) { - if (mode == LEADER) { + if (mode == Mode.LEADER) { masterService.submitStateUpdateTask("node-left", new NodeRemovalClusterStateTaskExecutor.Task(discoveryNode, "node left"), ClusterStateTaskConfig.build(Priority.IMMEDIATE), @@ -187,7 +185,7 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) { final DiscoveryNode sourceNode = publishRequest.getAcceptedState().nodes().getMasterNode(); logger.trace("handlePublishRequest: handling [{}] from [{}]", publishRequest, sourceNode); - if (sourceNode.equals(getLocalNode()) && (mode != LEADER || getCurrentTerm() != publishRequest.getAcceptedState().term())) { + if (sourceNode.equals(getLocalNode()) && (mode != Mode.LEADER || getCurrentTerm() != publishRequest.getAcceptedState().term())) { // Rare case in which we stood down as leader between starting this publication and receiving it ourselves. The publication // is already failed so there is no point in proceeding. throw new CoordinationStateRejectedException("no longer leading this publication's term: " + publishRequest); @@ -318,7 +316,7 @@ void becomeLeader(String method) { assert mode == Mode.CANDIDATE : "expected candidate but was " + mode; logger.debug("{}: becoming LEADER (was {}, lastKnownLeader was [{}])", method, mode, lastKnownLeader); - mode = LEADER; + mode = Mode.LEADER; joinAccumulator.close(mode); joinAccumulator = joinHelper.new LeaderJoinAccumulator(); @@ -428,7 +426,7 @@ public void invariant() { assert peerFinder.getCurrentTerm() == getCurrentTerm(); assert followersChecker.getFastResponseState().term == getCurrentTerm() : followersChecker.getFastResponseState(); assert followersChecker.getFastResponseState().mode == getMode() : followersChecker.getFastResponseState(); - if (mode == LEADER) { + if (mode == Mode.LEADER) { final boolean becomingMaster = getStateForMasterService().term() != getCurrentTerm(); assert coordinationState.get().electionWon(); @@ -518,7 +516,7 @@ ClusterState getStateForMasterService() { // expose last accepted cluster state as base state upon which the master service // speculatively calculates the next cluster state update final ClusterState clusterState = coordinationState.get().getLastAcceptedState(); - if (mode != LEADER || clusterState.term() != getCurrentTerm()) { + if (mode != Mode.LEADER || clusterState.term() != getCurrentTerm()) { // the master service checks if the local node is the master node in order to fail execution of the state update early return clusterStateWithNoMasterBlock(clusterState); } @@ -547,7 +545,7 @@ public void publish(ClusterChangedEvent clusterChangedEvent, ActionListener new ParameterizedMessage("[{}] failed publication as not currently leading", clusterChangedEvent.source())); publishListener.onFailure(new FailedToCommitClusterStateException("node stepped down as leader during publication")); From c66c4b0307f50563c947b88fa5f69ddf6f373514 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 2 Oct 2018 17:25:55 +0100 Subject: [PATCH 11/32] Revert --- .../elasticsearch/cluster/coordination/PreVoteCollector.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java index a1064359f03c2..c3417c17f8f53 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java @@ -200,8 +200,6 @@ public String toString() { ", electionStarted=" + electionStarted + ", preVoteRequest=" + preVoteRequest + ", isClosed=" + isClosed + - ", clusterState.term=" + clusterState.term() + - ", clusterState.version=" + clusterState.version() + '}'; } From a4d5c74635eb8b32d81f3a40352b5ded3920c39c Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 2 Oct 2018 17:32:20 +0100 Subject: [PATCH 12/32] No need for this guard without the affected assertion --- .../main/java/org/elasticsearch/discovery/PeerFinder.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java index 0cb6e68e1d57e..2beb94c82dfe9 100644 --- a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java +++ b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java @@ -362,11 +362,6 @@ public void onResponse(DiscoveryNode remoteNode) { @Override public void onFailure(Exception e) { logger.debug(() -> new ParameterizedMessage("{} connection failed", Peer.this), e); - synchronized (mutex) { - if (active == false) { - return; - } - } removePeer(); } }); From f77709704a814f3690ba9fd6a9e3485cbdbfcbfa Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 2 Oct 2018 18:09:11 +0100 Subject: [PATCH 13/32] Add test that PeersRequest has a timeout --- .../discovery/PeerFinderTests.java | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java index 0a74300beac58..2e0eb46d7deae 100644 --- a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java @@ -663,6 +663,51 @@ public void testDoesNotMakeMultipleConcurrentConnectionAttemptsToOneAddress() { assertFoundPeers(otherNode); } + public void testTimesOutAndRetriesConnectionsToBlackholedNodes() { + final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list"); + final DiscoveryNode nodeToFind = newDiscoveryNode("node-to-find"); + + providedAddresses.add(otherNode.getAddress()); + transportAddressConnector.reachableNodes.add(otherNode); + transportAddressConnector.reachableNodes.add(nodeToFind); + + peerFinder.activate(lastAcceptedNodes); + + while (true) { + deterministicTaskQueue.advanceTime(); + runAllRunnableTasks(); // MockTransportAddressConnector verifies no multiple connection attempts + if (capturingTransport.getCapturedRequestsAndClear().length > 0) { + break; + } + } + + final long timeoutAtMillis = deterministicTaskQueue.getCurrentTimeMillis() + + PeerFinder.DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING.get(Settings.EMPTY).millis(); + while (deterministicTaskQueue.getCurrentTimeMillis() < timeoutAtMillis) { + assertFoundPeers(otherNode); + deterministicTaskQueue.advanceTime(); + runAllRunnableTasks(); + } + + // need to wait for the connection to timeout, then for another wakeup, before discovering the peer + final long expectedTime = timeoutAtMillis + PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(Settings.EMPTY).millis(); + + while (deterministicTaskQueue.getCurrentTimeMillis() < expectedTime) { + deterministicTaskQueue.advanceTime(); + runAllRunnableTasks(); + } + + respondToRequests(node -> { + assertThat(node, is(otherNode)); + return new PeersResponse(Optional.empty(), singletonList(nodeToFind), randomNonNegativeLong()); + }); + + deterministicTaskQueue.advanceTime(); + runAllRunnableTasks(); + + assertFoundPeers(nodeToFind, otherNode); + } + public void testReconnectsToDisconnectedNodes() { final DiscoveryNode otherNode = newDiscoveryNode("original-node"); providedAddresses.add(otherNode.getAddress()); From 3a7fc1d47b982065967fbe31f77a44d63cd4cd51 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 2 Oct 2018 18:15:03 +0100 Subject: [PATCH 14/32] Line length --- .../org/elasticsearch/cluster/coordination/Coordinator.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index d47da4f77f552..5ffc4ac319d86 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -185,7 +185,8 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) { final DiscoveryNode sourceNode = publishRequest.getAcceptedState().nodes().getMasterNode(); logger.trace("handlePublishRequest: handling [{}] from [{}]", publishRequest, sourceNode); - if (sourceNode.equals(getLocalNode()) && (mode != Mode.LEADER || getCurrentTerm() != publishRequest.getAcceptedState().term())) { + if (sourceNode.equals(getLocalNode()) + && (mode != Mode.LEADER || getCurrentTerm() != publishRequest.getAcceptedState().term())) { // Rare case in which we stood down as leader between starting this publication and receiving it ourselves. The publication // is already failed so there is no point in proceeding. throw new CoordinationStateRejectedException("no longer leading this publication's term: " + publishRequest); From 76c1d054968ebbabdcc49d13fb458c4e6e66d0ba Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 3 Oct 2018 09:46:04 +0100 Subject: [PATCH 15/32] Rework stabilisation assertions - use isConnectedPair rather than looking at disconnected/blackholed sets - don't expect the follower to have a good state (no lag detection) - check that the leader's state is exactly the nodes to which it is connected --- .../coordination/CoordinatorTests.java | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 4a22c6f775390..bbadf5ca92c66 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -500,19 +500,20 @@ private void assertUniqueLeaderAndExpectedModes() { final String nodeId = clusterNode.getId(); - if (disconnectedNodes.contains(nodeId) || blackholedNodes.contains(nodeId)) { - assertThat(nodeId + " is a candidate", clusterNode.coordinator.getMode(), is(CANDIDATE)); - } else { + if (isConnectedPair(leader, clusterNode)) { + assertThat(nodeId + " is a follower", clusterNode.coordinator.getMode(), is(FOLLOWER)); assertThat(nodeId + " has the same term as the leader", clusterNode.coordinator.getCurrentTerm(), is(leaderTerm)); - // TODO assert that all nodes have actually voted for the leader in this term + // TODO assert that this node has actually voted for the leader in this term + // TODO assert that this node's accepted and committed states are the same as the leader's - assertThat(nodeId + " is a follower", clusterNode.coordinator.getMode(), is(FOLLOWER)); - assertThat(nodeId + " is at the same accepted version as the leader", - Optional.of(clusterNode.coordinator.getLastAcceptedState().getVersion()), isPresentAndEqualToLeaderVersion); - assertThat(nodeId + " is at the same committed version as the leader", - clusterNode.coordinator.getLastCommittedState().map(ClusterState::getVersion), isPresentAndEqualToLeaderVersion); - assertThat(clusterNode.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(dn -> dn.nodeExists(nodeId)), + assertThat(nodeId + " is in the leader's committed state", + leader.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(dn -> dn.nodeExists(nodeId)), equalTo(Optional.of(true))); + } else { + assertThat(nodeId + " is a candidate", clusterNode.coordinator.getMode(), is(CANDIDATE)); + assertThat(nodeId + " is not in the leader's committed state", + leader.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(dn -> dn.nodeExists(nodeId)), + equalTo(Optional.of(false))); } } From 0b180f05d55c03728843fbc0e89717596d3a42d7 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 3 Oct 2018 13:08:56 +0100 Subject: [PATCH 16/32] Term mismatch is rejected by CoordinatorState --- .../org/elasticsearch/cluster/coordination/Coordinator.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 5ffc4ac319d86..3b7e5dcfe3a71 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -185,8 +185,7 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) { final DiscoveryNode sourceNode = publishRequest.getAcceptedState().nodes().getMasterNode(); logger.trace("handlePublishRequest: handling [{}] from [{}]", publishRequest, sourceNode); - if (sourceNode.equals(getLocalNode()) - && (mode != Mode.LEADER || getCurrentTerm() != publishRequest.getAcceptedState().term())) { + if (sourceNode.equals(getLocalNode()) && mode != Mode.LEADER) { // Rare case in which we stood down as leader between starting this publication and receiving it ourselves. The publication // is already failed so there is no point in proceeding. throw new CoordinationStateRejectedException("no longer leading this publication's term: " + publishRequest); From c14cf7ae25b705445d62650ac2ec2044fd2e0607 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 3 Oct 2018 13:09:59 +0100 Subject: [PATCH 17/32] Register setting --- .../java/org/elasticsearch/common/settings/ClusterSettings.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index ae3f1f0fa0d39..632e57dca49e2 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -445,6 +445,7 @@ public void apply(Settings value, Settings current, Settings previous) { IndexGraveyard.SETTING_MAX_TOMBSTONES, EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING, PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING, + PeerFinder.DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING, ElectionSchedulerFactory.ELECTION_INITIAL_TIMEOUT_SETTING, ElectionSchedulerFactory.ELECTION_BACK_OFF_TIME_SETTING, ElectionSchedulerFactory.ELECTION_MAX_TIMEOUT_SETTING, From ff33d680bb6cc9e5b6e0abb4742dabce12a32098 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 3 Oct 2018 13:10:11 +0100 Subject: [PATCH 18/32] Reduce default for discovery.request_peers_timeout to 3s --- .../src/main/java/org/elasticsearch/discovery/PeerFinder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java index 2beb94c82dfe9..3d32762d53281 100644 --- a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java +++ b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java @@ -62,7 +62,7 @@ public abstract class PeerFinder extends AbstractComponent { public static final Setting DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING = Setting.timeSetting("discovery.request_peers_timeout", - TimeValue.timeValueMillis(10000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); + TimeValue.timeValueMillis(3000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); private final TimeValue findPeersInterval; private final TimeValue requestPeersTimeout; From e5bb9b3b72b572b44994b3b496559cd7e020fc16 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 3 Oct 2018 13:18:01 +0100 Subject: [PATCH 19/32] Execute directly, no need for mutex --- .../org/elasticsearch/cluster/coordination/Coordinator.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 3b7e5dcfe3a71..37414d50fc479 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -39,6 +39,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoverySettings; @@ -607,7 +608,7 @@ protected void onCompletion(boolean committed) { currentPublication = Optional.empty(); updateMaxTermSeen(getCurrentTerm()); // triggers term bump if new term was found during publication - localNodeAckEvent.addListener(wrapWithMutex(new ActionListener() { + localNodeAckEvent.addListener(new ActionListener() { @Override public void onResponse(Void ignore) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; @@ -635,7 +636,7 @@ public void onFailure(Exception e) { ackListener.onNodeAck(getLocalNode(), exception); // other nodes have acked, but not the master. publishListener.onFailure(exception); } - }), transportService.getThreadPool().generic()); + }, EsExecutors.newDirectExecutorService()); } @Override From af227e723288e9c0443ca0a756fff530d9c6a24b Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 3 Oct 2018 14:49:29 +0100 Subject: [PATCH 20/32] Only runRandomly if no disruptions in place --- .../elasticsearch/cluster/coordination/CoordinatorTests.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index bbadf5ca92c66..794e56cdf976e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -347,6 +347,9 @@ void addNodes(int newNodesCount) { void runRandomly() { + assert disconnectedNodes.isEmpty() : "may reconnect disconnected nodes, probably unexpected: " + disconnectedNodes; + assert blackholedNodes.isEmpty() : "may reconnect blackholed nodes, probably unexpected: " + blackholedNodes; + final int randomSteps = scaledRandomIntBetween(10, 10000); logger.info("--> start of safety phase of at least [{}] steps", randomSteps); From f8d5458a6e7ed77a2a8b24d1c0aee09cdcd04e77 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 3 Oct 2018 14:49:50 +0100 Subject: [PATCH 21/32] Only log changes to connected state --- .../coordination/CoordinatorTests.java | 25 +++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 794e56cdf976e..4e854ddcaa91d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -380,20 +380,31 @@ void runRandomly() { } else if (rarely()) { final ClusterNode clusterNode = getAnyNode(); final String id = clusterNode.getId(); - disconnectedNodes.remove(id); - blackholedNodes.remove(id); switch (randomInt(2)) { case 0: - logger.debug("----> [runRandomly {}] connecting {}", step, id); + boolean reconnected = disconnectedNodes.remove(id) | blackholedNodes.remove(id); // NB no short-circuit + if (reconnected) { + logger.debug("----> [runRandomly {}] connecting {}", step, id); + } break; case 1: - logger.debug("----> [runRandomly {}] disconnecting {}", step, id); - disconnectedNodes.add(id); + boolean unBlackholed = blackholedNodes.remove(id); + boolean disconnected = disconnectedNodes.add(id); + if (disconnected) { + logger.debug("----> [runRandomly {}] disconnecting {}", step, id); + } else { + assert unBlackholed == false; + } break; case 2: - logger.debug("----> [runRandomly {}] blackholing {}", step, id); - blackholedNodes.add(id); + boolean unDisconnected = disconnectedNodes.remove(id); + boolean blackholed = blackholedNodes.add(id); + if (blackholed) { + logger.debug("----> [runRandomly {}] blackholing {}", step, id); + } else { + assert unDisconnected == false; + } break; } } else { From 78757f9e7e2a77635ddd99ffc7092bfa6e1c9db1 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 3 Oct 2018 14:58:29 +0100 Subject: [PATCH 22/32] Revamp disruption-management logic --- .../coordination/CoordinatorTests.java | 51 ++++++++++--------- 1 file changed, 27 insertions(+), 24 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 4e854ddcaa91d..adab0706f78d0 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -176,8 +176,8 @@ public void testUnresponsiveLeaderDetectedEventually() { cluster.stabilise(); final ClusterNode originalLeader = cluster.getAnyLeader(); - logger.info("--> partitioning leader {}", originalLeader); - originalLeader.partition(); + logger.info("--> blackholing leader {}", originalLeader); + originalLeader.blackhole(); cluster.stabilise(Math.max( // first wait for all the followers to notice the leader has gone @@ -239,8 +239,8 @@ public void testUnresponsiveFollowerDetectedEventually() { final ClusterNode leader = cluster.getAnyLeader(); final ClusterNode follower = cluster.getAnyNodeExcept(leader); - logger.info("--> partitioning follower {}", follower); - follower.partition(); + logger.info("--> blackholing follower {}", follower); + follower.blackhole(); cluster.stabilise(Math.max( // wait for the leader to notice that the follower is unresponsive @@ -379,31 +379,21 @@ void runRandomly() { } } else if (rarely()) { final ClusterNode clusterNode = getAnyNode(); - final String id = clusterNode.getId(); switch (randomInt(2)) { case 0: - boolean reconnected = disconnectedNodes.remove(id) | blackholedNodes.remove(id); // NB no short-circuit - if (reconnected) { - logger.debug("----> [runRandomly {}] connecting {}", step, id); + if (clusterNode.connect()) { + logger.debug("----> [runRandomly {}] connecting {}", step, clusterNode.getId()); } break; case 1: - boolean unBlackholed = blackholedNodes.remove(id); - boolean disconnected = disconnectedNodes.add(id); - if (disconnected) { - logger.debug("----> [runRandomly {}] disconnecting {}", step, id); - } else { - assert unBlackholed == false; + if (clusterNode.disconnect()) { + logger.debug("----> [runRandomly {}] disconnecting {}", step, clusterNode.getId()); } break; case 2: - boolean unDisconnected = disconnectedNodes.remove(id); - boolean blackholed = blackholedNodes.add(id); - if (blackholed) { - logger.debug("----> [runRandomly {}] blackholing {}", step, id); - } else { - assert unDisconnected == false; + if (clusterNode.blackhole()) { + logger.debug("----> [runRandomly {}] blackholing {}", step, clusterNode.getId()); } break; } @@ -700,12 +690,25 @@ public String toString() { return localNode.toString(); } - void disconnect() { - disconnectedNodes.add(localNode.getId()); + boolean connect() { + boolean unBlackholed = blackholedNodes.remove(localNode.getId()); + boolean unDisconnected = disconnectedNodes.remove(localNode.getId()); + assert unBlackholed == false || unDisconnected == false; + return unBlackholed || unDisconnected; } - void partition() { - blackholedNodes.add(localNode.getId()); + boolean disconnect() { + boolean unBlackholed = blackholedNodes.remove(localNode.getId()); + boolean disconnected = disconnectedNodes.add(localNode.getId()); + assert disconnected || unBlackholed == false; + return disconnected; + } + + boolean blackhole() { + boolean unDisconnected = disconnectedNodes.remove(localNode.getId()); + boolean blackholed = blackholedNodes.add(localNode.getId()); + assert blackholed || unDisconnected == false; + return blackholed; } } From cf7047a83200152b7da46c3e3c026946a3012cf9 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 3 Oct 2018 15:28:37 +0100 Subject: [PATCH 23/32] Fix description of publication --- .../org/elasticsearch/cluster/coordination/Coordinator.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 37414d50fc479..62efa30e4db10 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -614,7 +614,7 @@ public void onResponse(Void ignore) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; assert coordinationState.get().getLastAcceptedTerm() == publishRequest.getAcceptedState().term() && coordinationState.get().getLastAcceptedVersion() == publishRequest.getAcceptedState().version() - : "onPossibleCompletion: term or version mismatch when publishing [" + this + : "onPossibleCompletion: term or version mismatch when publishing [" + publicationDescription() + "]: current version is now [" + coordinationState.get().getLastAcceptedVersion() + "] in term [" + coordinationState.get().getLastAcceptedTerm() + "]"; assert committed; @@ -673,6 +673,10 @@ protected void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest app ActionListener responseActionListener) { publicationHandler.sendApplyCommit(destination, applyCommit, wrapWithMutex(responseActionListener)); } + + private String publicationDescription() { + return toString(); + } }; assert currentPublication.isPresent() == false From a470ecadec76e1683c53ff5296a44114d764f488 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 3 Oct 2018 15:34:39 +0100 Subject: [PATCH 24/32] Fix order of requestId/action in request description --- .../elasticsearch/test/disruption/DisruptableMockTransport.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java index b12596c8b388d..ae584901bf0bf 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java @@ -115,7 +115,7 @@ public String toString() { protected String getRequestDescription(long requestId, String action, DiscoveryNode destination) { return new ParameterizedMessage("[{}][{}] from {} to {}", - action, requestId, getLocalNode(), destination).getFormattedMessage(); + requestId, action, getLocalNode(), destination).getFormattedMessage(); } protected void onBlackholedDuringSend(long requestId, String action, DiscoveryNode destination) { From dc2bddc6be8d6533fc4bef742c32d747a96e544c Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 3 Oct 2018 15:46:22 +0100 Subject: [PATCH 25/32] Remove bogus assertion --- .../org/elasticsearch/cluster/coordination/Coordinator.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 62efa30e4db10..1625cbdff1ea9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -612,11 +612,6 @@ protected void onCompletion(boolean committed) { @Override public void onResponse(Void ignore) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; - assert coordinationState.get().getLastAcceptedTerm() == publishRequest.getAcceptedState().term() - && coordinationState.get().getLastAcceptedVersion() == publishRequest.getAcceptedState().version() - : "onPossibleCompletion: term or version mismatch when publishing [" + publicationDescription() - + "]: current version is now [" + coordinationState.get().getLastAcceptedVersion() - + "] in term [" + coordinationState.get().getLastAcceptedTerm() + "]"; assert committed; // TODO: send to applier From 257eb85be8886455b59331fdb6496d32ccb66a4f Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 3 Oct 2018 16:17:52 +0100 Subject: [PATCH 26/32] No need for this method any more --- .../org/elasticsearch/cluster/coordination/Coordinator.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 1625cbdff1ea9..f88c7889b0ad0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -668,10 +668,6 @@ protected void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest app ActionListener responseActionListener) { publicationHandler.sendApplyCommit(destination, applyCommit, wrapWithMutex(responseActionListener)); } - - private String publicationDescription() { - return toString(); - } }; assert currentPublication.isPresent() == false From 1d9a91790e40dc0df82fbfb06feafa219d8a5bf6 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 3 Oct 2018 16:17:57 +0100 Subject: [PATCH 27/32] More state in assertion message --- .../org/elasticsearch/cluster/coordination/Coordinator.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index f88c7889b0ad0..3f3f39cd720dc 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -447,7 +447,8 @@ public void invariant() { lastPublishedState.nodes().forEach(lastPublishedNodes::add); assert lastPublishedNodes.remove(getLocalNode()); } - assert lastPublishedNodes.equals(knownFollowers) : lastPublishedNodes + " != " + knownFollowers; + assert lastPublishedNodes.equals(knownFollowers) : lastPublishedNodes + " != " + knownFollowers + + " [becomingMaster=" + becomingMaster + ", publicationInProgress=" + publicationInProgress() + "]"; } else if (mode == Mode.FOLLOWER) { assert coordinationState.get().electionWon() == false : getLocalNode() + " is FOLLOWER so electionWon() should be false"; assert lastKnownLeader.isPresent() && (lastKnownLeader.get().equals(getLocalNode()) == false); From 1a58b48433efca7ff6f2f8988dd35f5090f04206 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 3 Oct 2018 16:33:19 +0100 Subject: [PATCH 28/32] Stand down as master in rare condition --- .../elasticsearch/cluster/coordination/Coordinator.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 3f3f39cd720dc..19e44a4492b26 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -679,7 +679,13 @@ protected void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest app @Override public void run() { synchronized (mutex) { - publication.onTimeout(); + currentPublication.ifPresent(p -> { + if (p == publication && getLastAcceptedState().term() < publishRequest.getAcceptedState().term()) { + becomeCandidate("initial publication timed out before local acceptance"); + } else { + publication.onTimeout(); + } + }); } } From b7a5328a1f5821d25e905be1406a26f9ec50d62a Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 3 Oct 2018 17:40:50 +0100 Subject: [PATCH 29/32] Revert "Stand down as master in rare condition" This reverts commit 1a58b48433efca7ff6f2f8988dd35f5090f04206. --- .../elasticsearch/cluster/coordination/Coordinator.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 19e44a4492b26..3f3f39cd720dc 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -679,13 +679,7 @@ protected void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest app @Override public void run() { synchronized (mutex) { - currentPublication.ifPresent(p -> { - if (p == publication && getLastAcceptedState().term() < publishRequest.getAcceptedState().term()) { - becomeCandidate("initial publication timed out before local acceptance"); - } else { - publication.onTimeout(); - } - }); + publication.onTimeout(); } } From 34eb71b87b3d1723308dc5540b123f20e9ca9644 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 3 Oct 2018 17:42:32 +0100 Subject: [PATCH 30/32] Move assertion --- .../org/elasticsearch/cluster/coordination/Coordinator.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 3f3f39cd720dc..529eb32c4a537 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -446,9 +446,10 @@ public void invariant() { = currentPublication.map(Publication::publishedState).orElse(coordinationState.get().getLastAcceptedState()); lastPublishedState.nodes().forEach(lastPublishedNodes::add); assert lastPublishedNodes.remove(getLocalNode()); + assert lastPublishedNodes.equals(knownFollowers) : lastPublishedNodes + " != " + knownFollowers + + " [becomingMaster=" + becomingMaster + ", publicationInProgress=" + publicationInProgress() + "]"; + // TODO instead assert that knownFollowers is updated appropriately at the end of each publication } - assert lastPublishedNodes.equals(knownFollowers) : lastPublishedNodes + " != " + knownFollowers - + " [becomingMaster=" + becomingMaster + ", publicationInProgress=" + publicationInProgress() + "]"; } else if (mode == Mode.FOLLOWER) { assert coordinationState.get().electionWon() == false : getLocalNode() + " is FOLLOWER so electionWon() should be false"; assert lastKnownLeader.isPresent() && (lastKnownLeader.get().equals(getLocalNode()) == false); From 98a24f4ae47e6bdd3541322cf729e9455c030f4e Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 3 Oct 2018 18:29:50 +0100 Subject: [PATCH 31/32] Timeout join requests --- .../cluster/coordination/JoinHelper.java | 52 ++++++++++++------- .../common/settings/ClusterSettings.java | 4 +- 2 files changed, 35 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index 7bcfd7d9fb604..15b274c99f871 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -30,11 +30,14 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportResponseHandler; @@ -56,9 +59,15 @@ public class JoinHelper extends AbstractComponent { public static final String JOIN_ACTION_NAME = "internal:cluster/coordination/join"; public static final String START_JOIN_ACTION_NAME = "internal:cluster/coordination/start_join"; + // the timeout for each join attempt + public static final Setting JOIN_TIMEOUT_SETTING = + Setting.timeSetting("cluster.join.timeout", + TimeValue.timeValueMillis(60000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); + private final MasterService masterService; private final TransportService transportService; private final JoinTaskExecutor joinTaskExecutor; + private final TimeValue joinTimeout; final Set> pendingOutgoingJoins = ConcurrentCollections.newConcurrentSet(); @@ -68,6 +77,7 @@ public JoinHelper(Settings settings, AllocationService allocationService, Master super(settings); this.masterService = masterService; this.transportService = transportService; + this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings); this.joinTaskExecutor = new JoinTaskExecutor(allocationService, logger) { @Override @@ -130,29 +140,31 @@ public void sendJoinRequest(DiscoveryNode destination, Optional optionalJo final Tuple dedupKey = Tuple.tuple(destination, joinRequest); if (pendingOutgoingJoins.add(dedupKey)) { logger.debug("attempting to join {} with {}", destination, joinRequest); - transportService.sendRequest(destination, JOIN_ACTION_NAME, joinRequest, new TransportResponseHandler() { - @Override - public Empty read(StreamInput in) { - return Empty.INSTANCE; - } + transportService.sendRequest(destination, JOIN_ACTION_NAME, joinRequest, + TransportRequestOptions.builder().withTimeout(joinTimeout).build(), + new TransportResponseHandler() { + @Override + public Empty read(StreamInput in) { + return Empty.INSTANCE; + } - @Override - public void handleResponse(Empty response) { - pendingOutgoingJoins.remove(dedupKey); - logger.debug("successfully joined {} with {}", destination, joinRequest); - } + @Override + public void handleResponse(Empty response) { + pendingOutgoingJoins.remove(dedupKey); + logger.debug("successfully joined {} with {}", destination, joinRequest); + } - @Override - public void handleException(TransportException exp) { - pendingOutgoingJoins.remove(dedupKey); - logger.info(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exp); - } + @Override + public void handleException(TransportException exp) { + pendingOutgoingJoins.remove(dedupKey); + logger.info(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exp); + } - @Override - public String executor() { - return Names.SAME; - } - }); + @Override + public String executor() { + return Names.SAME; + } + }); } else { logger.debug("already attempting to join {} with request {}, not sending request", destination, joinRequest); } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 632e57dca49e2..438aec7944f92 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -33,6 +33,7 @@ import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.coordination.Coordinator; import org.elasticsearch.cluster.coordination.ElectionSchedulerFactory; +import org.elasticsearch.cluster.coordination.JoinHelper; import org.elasticsearch.cluster.metadata.IndexGraveyard; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.OperationRouting; @@ -449,7 +450,8 @@ public void apply(Settings value, Settings current, Settings previous) { ElectionSchedulerFactory.ELECTION_INITIAL_TIMEOUT_SETTING, ElectionSchedulerFactory.ELECTION_BACK_OFF_TIME_SETTING, ElectionSchedulerFactory.ELECTION_MAX_TIMEOUT_SETTING, - Coordinator.PUBLISH_TIMEOUT_SETTING + Coordinator.PUBLISH_TIMEOUT_SETTING, + JoinHelper.JOIN_TIMEOUT_SETTING ))); public static List> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList( From a9f3e008eb85990dfa897067e2608f798cad3a8b Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 3 Oct 2018 21:49:18 +0100 Subject: [PATCH 32/32] Use assertThat for better error messages --- .../elasticsearch/cluster/coordination/CoordinatorTests.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index adab0706f78d0..a29f1531c2d4a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -347,8 +347,9 @@ void addNodes(int newNodesCount) { void runRandomly() { - assert disconnectedNodes.isEmpty() : "may reconnect disconnected nodes, probably unexpected: " + disconnectedNodes; - assert blackholedNodes.isEmpty() : "may reconnect blackholed nodes, probably unexpected: " + blackholedNodes; + // TODO supporting (preserving?) existing disruptions needs implementing if needed, for now we just forbid it + assertThat("may reconnect disconnected nodes, probably unexpected", disconnectedNodes, empty()); + assertThat("may reconnect blackholed nodes, probably unexpected", blackholedNodes, empty()); final int randomSteps = scaledRandomIntBetween(10, 10000); logger.info("--> start of safety phase of at least [{}] steps", randomSteps);