-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Zen2: Add leader-side join handling logic #33013
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 11 commits
ef976f8
348fb4e
554263a
00463fe
391989b
7055a70
36b4b39
6b59200
5fb85fb
544560a
85a272a
2757c07
1ce4fdc
4676116
7dcd656
bd242ed
6f58a8c
a4b63f5
a2e9fc3
5a0efb5
aa9ce25
64e3cd4
d45cc11
49623cb
06ab9e1
5b390a8
1c522ce
66825c7
6566efb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,214 @@ | ||
| /* | ||
| * Licensed to Elasticsearch under one or more contributor | ||
| * license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright | ||
| * ownership. Elasticsearch licenses this file to you under | ||
| * the Apache License, Version 2.0 (the "License"); you may | ||
| * not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.elasticsearch.cluster.coordination; | ||
|
|
||
| import org.apache.lucene.util.SetOnce; | ||
| import org.elasticsearch.cluster.coordination.JoinHelper.JoinCallback; | ||
| import org.elasticsearch.cluster.node.DiscoveryNode; | ||
| import org.elasticsearch.cluster.routing.allocation.AllocationService; | ||
| import org.elasticsearch.cluster.service.MasterService; | ||
| import org.elasticsearch.common.component.AbstractLifecycleComponent; | ||
| import org.elasticsearch.common.settings.Settings; | ||
| import org.elasticsearch.transport.TransportService; | ||
|
|
||
| import java.util.Optional; | ||
| import java.util.function.Supplier; | ||
|
|
||
| public class Coordinator extends AbstractLifecycleComponent { | ||
|
|
||
| private final TransportService transportService; | ||
| private final JoinHelper joinHelper; | ||
| private final Supplier<CoordinationState.PersistedState> persistedStateSupplier; | ||
| final Object mutex = new Object(); | ||
| final SetOnce<CoordinationState> coordinationState = new SetOnce<>(); // initialized on start-up (see doStart) | ||
|
|
||
| private Mode mode; | ||
| private Optional<DiscoveryNode> lastKnownLeader; | ||
| private Optional<Join> lastJoin; | ||
|
|
||
| public Coordinator(Settings settings, TransportService transportService, AllocationService allocationService, | ||
| MasterService masterService, Supplier<CoordinationState.PersistedState> persistedStateSupplier) { | ||
| super(settings); | ||
| this.transportService = transportService; | ||
| this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService, this::getCurrentTerm, | ||
| this::handleJoinRequest); | ||
| this.persistedStateSupplier = persistedStateSupplier; | ||
| this.lastKnownLeader = Optional.empty(); | ||
| this.lastJoin = Optional.empty(); | ||
| } | ||
|
|
||
| private Optional<Join> ensureTermAtLeast(DiscoveryNode sourceNode, long targetTerm) { | ||
| assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; | ||
| if (getCurrentTerm() < targetTerm) { | ||
| return Optional.of(joinLeaderInTerm(new StartJoinRequest(sourceNode, targetTerm))); | ||
| } | ||
| return Optional.empty(); | ||
| } | ||
|
|
||
| private Join joinLeaderInTerm(StartJoinRequest startJoinRequest) { | ||
| assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; | ||
| logger.debug("joinLeaderInTerm: from [{}] with term {}", startJoinRequest.getSourceNode(), startJoinRequest.getTerm()); | ||
| Join join = coordinationState.get().handleStartJoin(startJoinRequest); | ||
| lastJoin = Optional.of(join); | ||
| if (mode != Mode.CANDIDATE) { | ||
| becomeCandidate("joinLeaderInTerm"); | ||
| } | ||
| return join; | ||
| } | ||
|
|
||
| public void handleJoinRequest(JoinRequest joinRequest, JoinCallback joinCallback) { | ||
| assert Thread.holdsLock(mutex) == false; | ||
| transportService.connectToNode(joinRequest.getSourceNode()); | ||
|
|
||
| synchronized (mutex) { | ||
| handleJoinRequestUnderLock(joinRequest, joinCallback); | ||
| } | ||
| } | ||
|
|
||
| private void handleJoinRequestUnderLock(JoinRequest joinRequest, JoinCallback joinCallback) { | ||
| assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; | ||
| logger.trace("handleJoinRequestUnderLock: as {}, handling {}", mode, joinRequest); | ||
|
|
||
| final CoordinationState coordState = coordinationState.get(); | ||
| final boolean prevElectionWon = coordState.electionWon(); | ||
|
|
||
| if (joinRequest.getOptionalJoin().isPresent()) { | ||
| final Join join = joinRequest.getOptionalJoin().get(); | ||
| // if someone thinks we should be master, let's add our vote and try to become one | ||
| // note that the following line should never throw an exception | ||
| ensureTermAtLeast(getLocalNode(), join.getTerm()).ifPresent(coordState::handleJoin); | ||
|
|
||
| // if we have already won the election, then the actual join does not matter for election purposes | ||
| if (coordState.electionWon()) { | ||
| // add join on a best-effort basis | ||
| try { | ||
| coordState.handleJoin(join); | ||
| } catch (CoordinationStateRejectedException e) { | ||
| logger.trace("failed to add join, ignoring", e); | ||
| } | ||
| } else { | ||
| coordState.handleJoin(join); // this might fail and bubble up the exception | ||
| } | ||
| } | ||
|
|
||
| if (prevElectionWon == false && coordState.electionWon()) { | ||
| joinHelper.addPendingJoin(joinRequest, joinCallback); | ||
| becomeLeader("handleJoin"); | ||
| joinHelper.clearAndSubmitPendingJoins(); | ||
| } else if (mode == Mode.LEADER) { | ||
| joinHelper.joinLeader(joinRequest, joinCallback); | ||
| } else if (mode == Mode.FOLLOWER) { | ||
| joinCallback.onFailure(new CoordinationStateRejectedException("join target is a follower")); | ||
| } else { | ||
| assert mode == Mode.CANDIDATE; | ||
| joinHelper.addPendingJoin(joinRequest, joinCallback); | ||
| } | ||
| } | ||
|
|
||
| void becomeCandidate(String method) { | ||
| assert Thread.holdsLock(mutex) : "Legislator mutex not held"; | ||
| logger.debug("{}: becoming CANDIDATE (was {}, lastKnownLeader was [{}])", method, mode, lastKnownLeader); | ||
|
|
||
| if (mode != Mode.CANDIDATE) { | ||
| mode = Mode.CANDIDATE; | ||
| } | ||
| } | ||
|
|
||
| void becomeLeader(String method) { | ||
| assert Thread.holdsLock(mutex) : "Legislator mutex not held"; | ||
| assert mode == Mode.CANDIDATE : "expected candidate but was " + mode; | ||
| logger.debug("{}: becoming LEADER (was {}, lastKnownLeader was [{}])", method, mode, lastKnownLeader); | ||
|
|
||
| mode = Mode.LEADER; | ||
| lastKnownLeader = Optional.of(getLocalNode()); | ||
| } | ||
|
|
||
| void becomeFollower(String method, DiscoveryNode leaderNode) { | ||
| assert Thread.holdsLock(mutex) : "Legislator mutex not held"; | ||
| logger.debug("{}: becoming FOLLOWER of [{}] (was {}, lastKnownLeader was [{}])", method, leaderNode, mode, lastKnownLeader); | ||
|
|
||
| if (mode != Mode.FOLLOWER) { | ||
| mode = Mode.FOLLOWER; | ||
| joinHelper.clearAndFailPendingJoins("following another master : " + leaderNode); | ||
| } | ||
|
|
||
| lastKnownLeader = Optional.of(leaderNode); | ||
| } | ||
|
|
||
| // package-visible for testing | ||
| long getCurrentTerm() { | ||
| synchronized (mutex) { | ||
| return coordinationState.get().getCurrentTerm(); | ||
| } | ||
| } | ||
|
|
||
| // package-visible for testing | ||
| Mode getMode() { | ||
| synchronized (mutex) { | ||
| return mode; | ||
| } | ||
| } | ||
|
|
||
| // package-visible for testing | ||
| DiscoveryNode getLocalNode() { | ||
| return transportService.getLocalNode(); | ||
| } | ||
|
|
||
| @Override | ||
| protected void doStart() { | ||
| CoordinationState.PersistedState persistedState = persistedStateSupplier.get(); | ||
| coordinationState.set(new CoordinationState(settings, getLocalNode(), persistedState)); | ||
| } | ||
|
|
||
| public void startInitialJoin() { | ||
| synchronized (mutex) { | ||
| becomeCandidate("startInitialJoin"); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| protected void doStop() { | ||
|
|
||
| } | ||
|
|
||
| @Override | ||
| protected void doClose() { | ||
|
|
||
| } | ||
|
|
||
| public void invariant() { | ||
| synchronized (mutex) { | ||
| if (mode == Mode.LEADER) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can restore those assertions about the state of the join helper here - i.e. no accumulated joins when leader or follower. |
||
| assert coordinationState.get().electionWon(); | ||
| assert lastKnownLeader.isPresent() && lastKnownLeader.get().equals(getLocalNode()); | ||
| assert joinHelper.getNumberOfPendingJoins() == 0; | ||
| } else if (mode == Mode.FOLLOWER) { | ||
| assert coordinationState.get().electionWon() == false : getLocalNode() + " is FOLLOWER so electionWon() should be false"; | ||
| assert lastKnownLeader.isPresent() && (lastKnownLeader.get().equals(getLocalNode()) == false); | ||
| assert joinHelper.getNumberOfPendingJoins() == 0; | ||
| } else { | ||
| assert mode == Mode.CANDIDATE; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| public enum Mode { | ||
| CANDIDATE, LEADER, FOLLOWER | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,159 @@ | ||
| /* | ||
| * Licensed to Elasticsearch under one or more contributor | ||
| * license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright | ||
| * ownership. Elasticsearch licenses this file to you under | ||
| * the Apache License, Version 2.0 (the "License"); you may | ||
| * not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.elasticsearch.cluster.coordination; | ||
|
|
||
| import org.elasticsearch.cluster.ClusterState; | ||
| import org.elasticsearch.cluster.ClusterStateTaskConfig; | ||
| import org.elasticsearch.cluster.ClusterStateTaskListener; | ||
| import org.elasticsearch.cluster.node.DiscoveryNode; | ||
| import org.elasticsearch.cluster.routing.allocation.AllocationService; | ||
| import org.elasticsearch.cluster.service.MasterService; | ||
| import org.elasticsearch.common.Priority; | ||
| import org.elasticsearch.common.component.AbstractComponent; | ||
| import org.elasticsearch.common.settings.Settings; | ||
| import org.elasticsearch.threadpool.ThreadPool; | ||
| import org.elasticsearch.transport.TransportResponse; | ||
| import org.elasticsearch.transport.TransportService; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.function.BiConsumer; | ||
| import java.util.function.LongSupplier; | ||
|
|
||
| public class JoinHelper extends AbstractComponent { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure the separation of responsibilities between this and the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The separation I had in mind here was that JoinHelper would be responsible for all join-related transport actions (i.e. later also have a sendJoin method + the startjoin stuff) and all MasterService-related join tasks, similar to
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suggested a slightly different split in |
||
|
|
||
| public static final String JOIN_ACTION_NAME = "internal:cluster/coordination/join"; | ||
|
|
||
| private final MasterService masterService; | ||
| private final TransportService transportService; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is only used in the constructor, doesn't need to be a field.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure, not yet. The follow-up PR will add a send method to this class, so it will be of direct use then. If you feel strongly about this, I can revert.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, that's fine. |
||
| private final JoinTaskExecutor joinTaskExecutor; | ||
| private final Map<DiscoveryNode, JoinCallback> joinRequestAccumulator = new HashMap<>(); | ||
|
|
||
| public JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService, | ||
| TransportService transportService, LongSupplier currentTermSupplier, | ||
| BiConsumer<JoinRequest, JoinCallback> joinRequestHandler) { | ||
| super(settings); | ||
| this.masterService = masterService; | ||
| this.transportService = transportService; | ||
| this.joinTaskExecutor = new JoinTaskExecutor(allocationService, logger) { | ||
|
|
||
| @Override | ||
| public ClusterTasksResult<JoinTaskExecutor.Task> execute(ClusterState currentState, List<JoinTaskExecutor.Task> joiningTasks) | ||
| throws Exception { | ||
| // This is called when preparing the next cluster state for publication. There is no guarantee that the term we see here is | ||
| // the term under which this state will eventually be published: the current term may be increased after this check due to | ||
| // some other activity. That the term is correct is, however, checked properly during publication, so it is sufficient to | ||
| // check it here on a best-effort basis. This is fine because a concurrent change indicates the existence of another leader | ||
| // in a higher term which will cause this node to stand down. | ||
|
|
||
| final long currentTerm = currentTermSupplier.getAsLong(); | ||
| if (currentState.term() != currentTerm) { | ||
| currentState = ClusterState.builder(currentState).term(currentTerm).build(); | ||
| } | ||
| return super.execute(currentState, joiningTasks); | ||
| } | ||
|
|
||
| }; | ||
|
|
||
| transportService.registerRequestHandler(JOIN_ACTION_NAME, ThreadPool.Names.GENERIC, false, false, JoinRequest::new, | ||
| (request, channel, task) -> joinRequestHandler.accept(request, new JoinCallback() { | ||
|
||
| @Override | ||
| public void onSuccess() { | ||
| try { | ||
| channel.sendResponse(TransportResponse.Empty.INSTANCE); | ||
| } catch (IOException e) { | ||
| onFailure(e); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void onFailure(Exception e) { | ||
| try { | ||
| channel.sendResponse(e); | ||
| } catch (Exception inner) { | ||
| inner.addSuppressed(e); | ||
| logger.warn("failed to send back failure on join request", inner); | ||
| } | ||
| } | ||
| })); | ||
| } | ||
|
|
||
| public void addPendingJoin(JoinRequest joinRequest, JoinCallback joinCallback) { | ||
| JoinCallback prev = joinRequestAccumulator.put(joinRequest.getSourceNode(), joinCallback); | ||
| if (prev != null) { | ||
| prev.onFailure(new CoordinationStateRejectedException("received a newer join from " + joinRequest.getSourceNode())); | ||
| } | ||
| } | ||
|
|
||
| public int getNumberOfPendingJoins() { | ||
| return joinRequestAccumulator.size(); | ||
| } | ||
|
|
||
| public void clearAndFailPendingJoins(String reason) { | ||
| joinRequestAccumulator.values().forEach( | ||
| joinCallback -> joinCallback.onFailure(new CoordinationStateRejectedException(reason))); | ||
| joinRequestAccumulator.clear(); | ||
| } | ||
|
|
||
| public void clearAndSubmitPendingJoins() { | ||
| final Map<JoinTaskExecutor.Task, ClusterStateTaskListener> pendingAsTasks = new HashMap<>(); | ||
| joinRequestAccumulator.forEach((key, value) -> pendingAsTasks.put(new JoinTaskExecutor.Task(key, "elect leader"), | ||
| new JoinTaskListener(value))); | ||
| joinRequestAccumulator.clear(); | ||
|
|
||
| pendingAsTasks.put(JoinTaskExecutor.BECOME_MASTER_TASK, (source, e) -> {}); | ||
| pendingAsTasks.put(JoinTaskExecutor.FINISH_ELECTION_TASK, (source, e) -> {}); | ||
| final String source = "elected-as-master ([" + pendingAsTasks.size() + "] nodes joined)"; | ||
| masterService.submitStateUpdateTasks(source, pendingAsTasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor); | ||
| } | ||
|
|
||
| public void joinLeader(JoinRequest joinRequest, JoinCallback joinCallback) { | ||
| // submit as cluster state update task | ||
| masterService.submitStateUpdateTask("node-join", | ||
| new JoinTaskExecutor.Task(joinRequest.getSourceNode(), "join existing leader"), ClusterStateTaskConfig.build(Priority.URGENT), | ||
| joinTaskExecutor, new JoinTaskListener(joinCallback)); | ||
| } | ||
|
|
||
| public interface JoinCallback { | ||
| void onSuccess(); | ||
|
|
||
| void onFailure(Exception e); | ||
| } | ||
|
|
||
| static class JoinTaskListener implements ClusterStateTaskListener { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could this implement
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added in 4676116 |
||
| private final JoinCallback joinCallback; | ||
|
|
||
| JoinTaskListener(JoinCallback joinCallback) { | ||
| this.joinCallback = joinCallback; | ||
| } | ||
|
|
||
| @Override | ||
| public void onFailure(String source, Exception e) { | ||
| joinCallback.onFailure(e); | ||
| } | ||
|
|
||
| @Override | ||
| public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { | ||
| joinCallback.onSuccess(); | ||
| } | ||
| } | ||
|
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can't happen as a
LEADERbut can it happen as aFOLLOWER? I think it can't, because we must have bumped our term too. Therefore, can this be reorganised as a switch onmode?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great observation. The scenario I had in mind while writing these conditions was as follows: Assume you want to do a leader handoff. You send a startJoin to all nodes, telling them to join the new prospective leader. Assume the corresponding join from one node arrives on the prospective leader (that is still a follower) before that one has received the start join. Handling of this join with higher term will trigger
ensureTermAtLeast, turn the node into a candidate, then handle the join, and then reach this point here. So yes, it sounds like we could fold this check into the CANDIDATE branch here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've simplified this in bd242ed