diff --git a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java index 2ac2f1081c4e4..05b7b438b7f9f 100644 --- a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java +++ b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java @@ -89,22 +89,26 @@ public void activate(final DiscoveryNodes lastAcceptedNodes) { logger.trace("activating with {}", lastAcceptedNodes); synchronized (mutex) { - assert active == false; + assert assertInactiveWithNoKnownPeers(); active = true; this.lastAcceptedNodes = lastAcceptedNodes; leader = Optional.empty(); - handleWakeUp(); + handleWakeUp(); // return value discarded: there are no known peers, so none can be disconnected } } public void deactivate(DiscoveryNode leader) { + final boolean peersRemoved; synchronized (mutex) { logger.trace("deactivating and setting leader to {}", leader); active = false; - handleWakeUp(); + peersRemoved = handleWakeUp(); this.leader = Optional.of(leader); assert assertInactiveWithNoKnownPeers(); } + if (peersRemoved) { + onFoundPeersUpdated(); + } } // exposed to subclasses for testing @@ -114,7 +118,7 @@ protected final boolean holdsLock() { boolean assertInactiveWithNoKnownPeers() { assert active == false; - assert peersByAddress.isEmpty(); + assert peersByAddress.isEmpty() : peersByAddress.keySet(); return true; } @@ -142,10 +146,20 @@ private DiscoveryNode getLocalNode() { } /** - * Called on receipt of a PeersResponse from a node that believes it's an active leader, which this node should therefore try and join. + * Invoked on receipt of a PeersResponse from a node that believes it's an active leader, which this node should therefore try and join. + * Note that invocations of this method are not synchronised. By the time it is called we may have been deactivated. */ protected abstract void onActiveMasterFound(DiscoveryNode masterNode, long term); + /** + * Invoked when the set of found peers changes. Note that invocations of this method are not fully synchronised, so we only guarantee + * that the change to the set of found peers happens before this method is invoked. If there are multiple concurrent changes then there + * will be multiple concurrent invocations of this method, with no guarantee as to their order. For this reason we do not pass the + * updated set of peers as an argument to this method, leaving it to the implementation to call getFoundPeers() with appropriate + * synchronisation to avoid lost updates. Also, by the time this method is invoked we may have been deactivated. + */ + protected abstract void onFoundPeersUpdated(); + public interface TransportAddressConnector { /** * Identify the node at the given address and, if it is a master node and not the local node then establish a full connection to it. @@ -170,7 +184,6 @@ public Iterable getFoundPeers() { } private List getFoundPeersUnderLock() { - assert active; assert holdsLock() : "PeerFinder mutex not held"; return peersByAddress.values().stream().map(Peer::getDiscoveryNode).filter(Objects::nonNull).collect(Collectors.toList()); } @@ -181,16 +194,21 @@ private Peer createConnectingPeer(TransportAddress transportAddress) { return peer; } - private void handleWakeUp() { + /** + * @return whether any peers were removed due to disconnection + */ + private boolean handleWakeUp() { assert holdsLock() : "PeerFinder mutex not held"; + boolean peersRemoved = false; + for (final Peer peer : peersByAddress.values()) { - peer.handleWakeUp(); + peersRemoved = peer.handleWakeUp() || peersRemoved; // care: avoid short-circuiting, each peer needs waking up } if (active == false) { logger.trace("not active"); - return; + return peersRemoved; } logger.trace("probing master nodes from cluster state: {}", lastAcceptedNodes); @@ -220,8 +238,11 @@ public void onFailure(Exception e) { @Override protected void doRun() { synchronized (mutex) { - handleWakeUp(); + if (handleWakeUp() == false) { + return; + } } + onFoundPeersUpdated(); } @Override @@ -229,6 +250,8 @@ public String toString() { return "PeerFinder::handleWakeUp"; } }); + + return peersRemoved; } private void startProbe(TransportAddress transportAddress) { @@ -260,12 +283,12 @@ DiscoveryNode getDiscoveryNode() { return discoveryNode.get(); } - void handleWakeUp() { + boolean handleWakeUp() { assert holdsLock() : "PeerFinder mutex not held"; if (active == false) { removePeer(); - return; + return true; } final DiscoveryNode discoveryNode = getDiscoveryNode(); @@ -279,8 +302,11 @@ void handleWakeUp() { } else { logger.trace("{} no longer connected", this); removePeer(); + return true; } } + + return false; } void establishConnection() { @@ -295,12 +321,17 @@ public void onResponse(DiscoveryNode remoteNode) { assert remoteNode.isMasterNode() : remoteNode + " is not master-eligible"; assert remoteNode.equals(getLocalNode()) == false : remoteNode + " is the local node"; synchronized (mutex) { - if (active) { - assert discoveryNode.get() == null : "discoveryNode unexpectedly already set to " + discoveryNode.get(); - discoveryNode.set(remoteNode); - requestPeers(); + if (active == false) { + return; } + + assert discoveryNode.get() == null : "discoveryNode unexpectedly already set to " + discoveryNode.get(); + discoveryNode.set(remoteNode); + requestPeers(); } + + assert holdsLock() == false : "PeerFinder mutex is held in error"; + onFoundPeersUpdated(); } @Override diff --git a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java index b924344e868b0..d1e669f6cc3d8 100644 --- a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java @@ -86,6 +86,7 @@ public class PeerFinderTests extends ESTestCase { private Set connectedNodes = new HashSet<>(); private DiscoveryNodes lastAcceptedNodes; private TransportService transportService; + private Iterable foundPeersFromNotification; private static long CONNECTION_TIMEOUT_MILLIS = 30000; @@ -156,6 +157,13 @@ protected void onActiveMasterFound(DiscoveryNode masterNode, long term) { discoveredMasterNode = masterNode; discoveredMasterTerm = OptionalLong.of(term); } + + @Override + protected void onFoundPeersUpdated() { + assert holdsLock() == false : "PeerFinder lock held in error"; + foundPeersFromNotification = getFoundPeers(); + logger.trace("onFoundPeersUpdated({})", foundPeersFromNotification); + } } private void resolveConfiguredHosts(Consumer> onResult) { @@ -214,13 +222,13 @@ public void setup() { lastAcceptedNodes = DiscoveryNodes.builder().localNodeId(localNode.getId()).add(localNode).build(); peerFinder = new TestPeerFinder(settings, transportService, transportAddressConnector); + foundPeersFromNotification = emptyList(); } @After public void deactivateAndRunRemainingTasks() { peerFinder.deactivate(localNode); - deterministicTaskQueue.runAllTasks(); // termination ensures that everything is properly cleaned up - peerFinder.assertInactiveWithNoKnownPeers(); // should eventually have no nodes when deactivated + deterministicTaskQueue.runAllRunnableTasks(random()); } public void testAddsReachableNodesFromUnicastHostsList() { @@ -693,6 +701,13 @@ private void assertFoundPeers(DiscoveryNode... expectedNodesArray) { final Stream expectedNodes = Arrays.stream(expectedNodesArray); final Stream actualNodes = StreamSupport.stream(peerFinder.getFoundPeers().spliterator(), false); assertThat(actualNodes.collect(Collectors.toSet()), equalTo(expectedNodes.collect(Collectors.toSet()))); + assertNotifiedOfAllUpdates(); + } + + private void assertNotifiedOfAllUpdates() { + final Stream actualNodes = StreamSupport.stream(peerFinder.getFoundPeers().spliterator(), false); + final Stream notifiedNodes = StreamSupport.stream(foundPeersFromNotification.spliterator(), false); + assertThat(notifiedNodes.collect(Collectors.toSet()), equalTo(actualNodes.collect(Collectors.toSet()))); } private DiscoveryNode newDiscoveryNode(String nodeId) { @@ -700,7 +715,19 @@ private DiscoveryNode newDiscoveryNode(String nodeId) { } private void runAllRunnableTasks() { + deterministicTaskQueue.scheduleNow(new Runnable() { + @Override + public void run() { + PeerFinderTests.this.assertNotifiedOfAllUpdates(); + } + + @Override + public String toString() { + return "assertNotifiedOfAllUpdates"; + } + }); deterministicTaskQueue.runAllRunnableTasks(random()); + assertNotifiedOfAllUpdates(); } }