diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java index 8a6e6d1833258..d5b9cdf6adfc3 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java @@ -158,13 +158,25 @@ public void setInitialState(ClusterState initialState) { */ public Join handleStartJoin(StartJoinRequest startJoinRequest) { if (startJoinRequest.getTerm() <= getCurrentTerm()) { - logger.debug("handleStartJoin: ignored as term provided [{}] not greater than current term [{}]", - startJoinRequest.getTerm(), getCurrentTerm()); + logger.debug("handleStartJoin: ignoring [{}] as term provided is not greater than current term [{}]", + startJoinRequest, getCurrentTerm()); throw new CoordinationStateRejectedException("incoming term " + startJoinRequest.getTerm() + " not greater than current term " + getCurrentTerm()); } - logger.debug("handleStartJoin: updating term from [{}] to [{}]", getCurrentTerm(), startJoinRequest.getTerm()); + logger.debug("handleStartJoin: leaving term [{}] due to {}", getCurrentTerm(), startJoinRequest); + + if (joinVotes.isEmpty() == false) { + final String reason; + if (electionWon == false) { + reason = "failed election"; + } else if (startJoinRequest.getSourceNode().equals(localNode)) { + reason = "bumping term"; + } else { + reason = "standing down as leader"; + } + logger.debug("handleStartJoin: discarding {}: {}", joinVotes, reason); + } persistedState.setCurrentTerm(startJoinRequest.getTerm()); assert getCurrentTerm() == startJoinRequest.getTerm(); @@ -232,6 +244,7 @@ public boolean handleJoin(Join join) { join.getSourceNode(), electionWon, lastAcceptedTerm, getLastAcceptedVersion()); if (electionWon && prevElectionWon == false) { + logger.debug("handleJoin: election won in term [{}] with {}", getCurrentTerm(), joinVotes); lastPublishedVersion = getLastAcceptedVersion(); } return added; diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 2b112102a562d..2ac5bbf34c1d8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.coordination.JoinHelper.InitialJoinAccumulator; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.allocation.AllocationService; @@ -99,7 +100,7 @@ public Coordinator(Settings settings, TransportService transportService, Allocat this.persistedStateSupplier = persistedStateSupplier; this.lastKnownLeader = Optional.empty(); this.lastJoin = Optional.empty(); - this.joinAccumulator = joinHelper.new CandidateJoinAccumulator(); + this.joinAccumulator = new InitialJoinAccumulator(); this.publishTimeout = PUBLISH_TIMEOUT_SETTING.get(settings); this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool()); this.preVoteCollector = new PreVoteCollector(settings, transportService, this::startElection, this::updateMaxTermSeen); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index bc01612b899bc..500f1642329bf 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -202,7 +202,6 @@ interface JoinAccumulator { void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback); default void close(Mode newMode) { - } } @@ -220,6 +219,19 @@ public String toString() { } } + static class InitialJoinAccumulator implements JoinAccumulator { + @Override + public void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback) { + assert false : "unexpected join from " + sender + " during initialisation"; + joinCallback.onFailure(new CoordinationStateRejectedException("join target is not initialised yet")); + } + + @Override + public String toString() { + return "InitialJoinAccumulator"; + } + } + static class FollowerJoinAccumulator implements JoinAccumulator { @Override public void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback) { @@ -265,13 +277,14 @@ public void close(Mode newMode) { }); masterService.submitStateUpdateTasks(stateUpdateSource, pendingAsTasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor); - } else if (newMode == Mode.FOLLOWER) { + } else { + assert newMode == Mode.FOLLOWER : newMode; joinRequestAccumulator.values().forEach(joinCallback -> joinCallback.onFailure( new CoordinationStateRejectedException("became follower"))); - } else { - assert newMode == Mode.CANDIDATE; - assert joinRequestAccumulator.isEmpty() : joinRequestAccumulator.keySet(); } + + // CandidateJoinAccumulator is only closed when becoming leader or follower, otherwise it accumulates all joins received + // regardless of term. } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java index 6d2486da69542..0adf483d5aa46 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java @@ -170,8 +170,8 @@ private ClusterState.Builder becomeMasterAndTrimConflictingNodes(ClusterState cu ClusterState tmpState = ClusterState.builder(currentState).nodes(nodesBuilder).blocks(ClusterBlocks.builder() .blocks(currentState.blocks()) .removeGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID)).build(); - return ClusterState.builder(allocationService.deassociateDeadNodes(tmpState, false, - "removed dead nodes on election")); + logger.trace("becomeMasterAndTrimConflictingNodes: {}", tmpState.nodes()); + return ClusterState.builder(allocationService.deassociateDeadNodes(tmpState, false, "removed dead nodes on election")); } @Override diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 1a498f38389e9..584a7d0df11e7 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -1221,7 +1221,17 @@ public void sendResponse(Exception exception) throws IOException { if (ThreadPool.Names.SAME.equals(executor)) { processException(handler, rtx); } else { - threadPool.executor(handler.executor()).execute(() -> processException(handler, rtx)); + threadPool.executor(handler.executor()).execute(new Runnable() { + @Override + public void run() { + processException(handler, rtx); + } + + @Override + public String toString() { + return "delivery of exception response to [" + action + "][" + requestId + "]: " + exception; + } + }); } } } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index c8588d115fddb..0113e8de2c2fc 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -76,6 +76,8 @@ public void testCanUpdateClusterStateAfterStabilisation() { final ClusterNode leader = cluster.getAnyLeader(); long finalValue = randomLong(); + + logger.info("--> submitting value [{}] to [{}]", finalValue, leader); leader.submitValue(finalValue); cluster.stabilise(); // TODO this should only need a short stabilisation @@ -96,6 +98,7 @@ class Cluster { final List clusterNodes; final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue( + // TODO does ThreadPool need a node name any more? Settings.builder().put(NODE_NAME_SETTING.getKey(), "deterministic-task-queue").build()); private final VotingConfiguration initialConfiguration; @@ -155,7 +158,7 @@ private void assertUniqueLeaderAndExpectedModes() { final String nodeId = clusterNode.getId(); assertThat(nodeId + " has the same term as the leader", clusterNode.coordinator.getCurrentTerm(), is(leaderTerm)); - assertTrue("leader should have received a vote from " + nodeId, + assertTrue("leader " + leader.getId() + " should have received a vote from " + nodeId, leader.coordinator.hasJoinVoteFrom(clusterNode.getLocalNode())); assertThat(nodeId + " is a follower", clusterNode.coordinator.getMode(), is(FOLLOWER)); @@ -267,7 +270,7 @@ boolean isLeader() { } void submitValue(final long value) { - masterService.submitStateUpdateTask("new value [" + value + "]", new ClusterStateUpdateTask() { + onNode(localNode, () -> masterService.submitStateUpdateTask("new value [" + value + "]", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { return setValue(currentState, value); @@ -277,7 +280,12 @@ public ClusterState execute(ClusterState currentState) { public void onFailure(String source, Exception e) { logger.debug(() -> new ParameterizedMessage("failed to publish: [{}]", source), e); } - }); + })).run(); + } + + @Override + public String toString() { + return localNode.toString(); } }