Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,25 @@ public void setInitialState(ClusterState initialState) {
*/
public Join handleStartJoin(StartJoinRequest startJoinRequest) {
if (startJoinRequest.getTerm() <= getCurrentTerm()) {
logger.debug("handleStartJoin: ignored as term provided [{}] not greater than current term [{}]",
startJoinRequest.getTerm(), getCurrentTerm());
logger.debug("handleStartJoin: ignoring [{}] as term provided is not greater than current term [{}]",
startJoinRequest, getCurrentTerm());
throw new CoordinationStateRejectedException("incoming term " + startJoinRequest.getTerm() +
" not greater than current term " + getCurrentTerm());
}

logger.debug("handleStartJoin: updating term from [{}] to [{}]", getCurrentTerm(), startJoinRequest.getTerm());
logger.debug("handleStartJoin: leaving term [{}] due to {}", getCurrentTerm(), startJoinRequest);

if (joinVotes.isEmpty() == false) {
final String reason;
if (electionWon == false) {
reason = "failed election";
} else if (startJoinRequest.getSourceNode().equals(localNode)) {
reason = "bumping term";
} else {
reason = "standing down as leader";
}
logger.debug("handleStartJoin: discarding {}: {}", joinVotes, reason);
}

persistedState.setCurrentTerm(startJoinRequest.getTerm());
assert getCurrentTerm() == startJoinRequest.getTerm();
Expand Down Expand Up @@ -232,6 +244,7 @@ public boolean handleJoin(Join join) {
join.getSourceNode(), electionWon, lastAcceptedTerm, getLastAcceptedVersion());

if (electionWon && prevElectionWon == false) {
logger.debug("handleJoin: election won in term [{}] with {}", getCurrentTerm(), joinVotes);
lastPublishedVersion = getLastAcceptedVersion();
}
return added;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.JoinHelper.InitialJoinAccumulator;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
Expand Down Expand Up @@ -99,7 +100,7 @@ public Coordinator(Settings settings, TransportService transportService, Allocat
this.persistedStateSupplier = persistedStateSupplier;
this.lastKnownLeader = Optional.empty();
this.lastJoin = Optional.empty();
this.joinAccumulator = joinHelper.new CandidateJoinAccumulator();
this.joinAccumulator = new InitialJoinAccumulator();
this.publishTimeout = PUBLISH_TIMEOUT_SETTING.get(settings);
this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool());
this.preVoteCollector = new PreVoteCollector(settings, transportService, this::startElection, this::updateMaxTermSeen);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ interface JoinAccumulator {
void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback);

default void close(Mode newMode) {

}
}

Expand All @@ -220,6 +219,19 @@ public String toString() {
}
}

static class InitialJoinAccumulator implements JoinAccumulator {
@Override
public void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback) {
assert false : "unexpected join from " + sender + " during initialisation";
joinCallback.onFailure(new CoordinationStateRejectedException("join target is not initialised yet"));
}

@Override
public String toString() {
return "InitialJoinAccumulator";
}
}

static class FollowerJoinAccumulator implements JoinAccumulator {
@Override
public void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback) {
Expand Down Expand Up @@ -265,13 +277,14 @@ public void close(Mode newMode) {
});
masterService.submitStateUpdateTasks(stateUpdateSource, pendingAsTasks, ClusterStateTaskConfig.build(Priority.URGENT),
joinTaskExecutor);
} else if (newMode == Mode.FOLLOWER) {
} else {
assert newMode == Mode.FOLLOWER : newMode;
joinRequestAccumulator.values().forEach(joinCallback -> joinCallback.onFailure(
new CoordinationStateRejectedException("became follower")));
} else {
assert newMode == Mode.CANDIDATE;
assert joinRequestAccumulator.isEmpty() : joinRequestAccumulator.keySet();
}

// CandidateJoinAccumulator is only closed when becoming leader or follower, otherwise it accumulates all joins received
// regardless of term.
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ private ClusterState.Builder becomeMasterAndTrimConflictingNodes(ClusterState cu
ClusterState tmpState = ClusterState.builder(currentState).nodes(nodesBuilder).blocks(ClusterBlocks.builder()
.blocks(currentState.blocks())
.removeGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID)).build();
return ClusterState.builder(allocationService.deassociateDeadNodes(tmpState, false,
"removed dead nodes on election"));
logger.trace("becomeMasterAndTrimConflictingNodes: {}", tmpState.nodes());
return ClusterState.builder(allocationService.deassociateDeadNodes(tmpState, false, "removed dead nodes on election"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1221,7 +1221,17 @@ public void sendResponse(Exception exception) throws IOException {
if (ThreadPool.Names.SAME.equals(executor)) {
processException(handler, rtx);
} else {
threadPool.executor(handler.executor()).execute(() -> processException(handler, rtx));
threadPool.executor(handler.executor()).execute(new Runnable() {
@Override
public void run() {
processException(handler, rtx);
}

@Override
public String toString() {
return "delivery of exception response to [" + action + "][" + requestId + "]: " + exception;
}
});
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public void testCanUpdateClusterStateAfterStabilisation() {

final ClusterNode leader = cluster.getAnyLeader();
long finalValue = randomLong();

logger.info("--> submitting value [{}] to [{}]", finalValue, leader);
leader.submitValue(finalValue);
cluster.stabilise(); // TODO this should only need a short stabilisation

Expand All @@ -96,6 +98,7 @@ class Cluster {

final List<ClusterNode> clusterNodes;
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(
// TODO does ThreadPool need a node name any more?
Settings.builder().put(NODE_NAME_SETTING.getKey(), "deterministic-task-queue").build());
private final VotingConfiguration initialConfiguration;

Expand Down Expand Up @@ -155,7 +158,7 @@ private void assertUniqueLeaderAndExpectedModes() {

final String nodeId = clusterNode.getId();
assertThat(nodeId + " has the same term as the leader", clusterNode.coordinator.getCurrentTerm(), is(leaderTerm));
assertTrue("leader should have received a vote from " + nodeId,
assertTrue("leader " + leader.getId() + " should have received a vote from " + nodeId,
leader.coordinator.hasJoinVoteFrom(clusterNode.getLocalNode()));

assertThat(nodeId + " is a follower", clusterNode.coordinator.getMode(), is(FOLLOWER));
Expand Down Expand Up @@ -267,7 +270,7 @@ boolean isLeader() {
}

void submitValue(final long value) {
masterService.submitStateUpdateTask("new value [" + value + "]", new ClusterStateUpdateTask() {
onNode(localNode, () -> masterService.submitStateUpdateTask("new value [" + value + "]", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return setValue(currentState, value);
Expand All @@ -277,7 +280,12 @@ public ClusterState execute(ClusterState currentState) {
public void onFailure(String source, Exception e) {
logger.debug(() -> new ParameterizedMessage("failed to publish: [{}]", source), e);
}
});
})).run();
}

@Override
public String toString() {
return localNode.toString();
}
}

Expand Down