-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Zen2: Add Cluster State Applier #34257
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
54632b6
4813b3e
f8d1057
2225704
15a4da8
42451ba
8c846dc
8fecbd6
b77e5ab
a7c286c
7274f7b
00938c2
cd22d4a
c59ef80
a6739a9
ca833e0
627d68b
ff009b6
cc10b84
e2a4f41
84a69c1
a773288
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,7 @@ | |
| import org.apache.lucene.util.SetOnce; | ||
| import org.elasticsearch.action.ActionListener; | ||
| import org.elasticsearch.cluster.ClusterChangedEvent; | ||
| import org.elasticsearch.cluster.ClusterName; | ||
| import org.elasticsearch.cluster.ClusterState; | ||
| import org.elasticsearch.cluster.ClusterStateTaskConfig; | ||
| import org.elasticsearch.cluster.block.ClusterBlocks; | ||
|
|
@@ -30,6 +31,8 @@ | |
| import org.elasticsearch.cluster.node.DiscoveryNode; | ||
| import org.elasticsearch.cluster.node.DiscoveryNodes; | ||
| import org.elasticsearch.cluster.routing.allocation.AllocationService; | ||
| import org.elasticsearch.cluster.service.ClusterApplier; | ||
| import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener; | ||
| import org.elasticsearch.cluster.service.MasterService; | ||
| import org.elasticsearch.common.Nullable; | ||
| import org.elasticsearch.common.Priority; | ||
|
|
@@ -39,6 +42,7 @@ | |
| import org.elasticsearch.common.settings.Setting; | ||
| import org.elasticsearch.common.settings.Settings; | ||
| import org.elasticsearch.common.unit.TimeValue; | ||
| import org.elasticsearch.common.util.concurrent.EsExecutors; | ||
| import org.elasticsearch.common.util.concurrent.ListenableFuture; | ||
| import org.elasticsearch.discovery.Discovery; | ||
| import org.elasticsearch.discovery.DiscoverySettings; | ||
|
|
@@ -60,6 +64,9 @@ | |
| import java.util.concurrent.atomic.AtomicLong; | ||
| import java.util.function.Supplier; | ||
|
|
||
| import static org.elasticsearch.discovery.DiscoverySettings.NO_MASTER_BLOCK_WRITES; | ||
| import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; | ||
|
|
||
| public class Coordinator extends AbstractLifecycleComponent implements Discovery { | ||
|
|
||
| // the timeout for the publication of each value | ||
|
|
@@ -77,6 +84,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery | |
| final Object mutex = new Object(); | ||
| final SetOnce<CoordinationState> coordinationState = new SetOnce<>(); // initialized on start-up (see doStart) | ||
| private volatile Optional<ClusterState> lastCommittedState = Optional.empty(); | ||
| private volatile ClusterState applierState; // the state that should be exposed to the cluster state applier | ||
|
|
||
| private final PeerFinder peerFinder; | ||
| private final PreVoteCollector preVoteCollector; | ||
|
|
@@ -86,6 +94,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery | |
| private final PublicationTransportHandler publicationHandler; | ||
| private final LeaderChecker leaderChecker; | ||
| private final FollowersChecker followersChecker; | ||
| private final ClusterApplier clusterApplier; | ||
| @Nullable | ||
| private Releasable electionScheduler; | ||
| @Nullable | ||
|
|
@@ -102,7 +111,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery | |
|
|
||
| public Coordinator(Settings settings, TransportService transportService, AllocationService allocationService, | ||
| MasterService masterService, Supplier<CoordinationState.PersistedState> persistedStateSupplier, | ||
| UnicastHostsProvider unicastHostsProvider, Random random) { | ||
| UnicastHostsProvider unicastHostsProvider, ClusterApplier clusterApplier, Random random) { | ||
| super(settings); | ||
| this.transportService = transportService; | ||
| this.masterService = masterService; | ||
|
|
@@ -118,10 +127,12 @@ public Coordinator(Settings settings, TransportService transportService, Allocat | |
| configuredHostsResolver = new UnicastConfiguredHostsResolver(settings, transportService, unicastHostsProvider); | ||
| this.peerFinder = new CoordinatorPeerFinder(settings, transportService, | ||
| new HandshakingTransportAddressConnector(settings, transportService), configuredHostsResolver); | ||
| this.publicationHandler = new PublicationTransportHandler(transportService, this::handlePublishRequest, this::handleApplyCommit); | ||
| this.publicationHandler = new PublicationTransportHandler(transportService, this::handlePublishRequest, this::handleApplyCommit, | ||
| logger); | ||
DaveCTurner marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| this.leaderChecker = new LeaderChecker(settings, transportService, getOnLeaderFailure()); | ||
| this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::onFollowerFailure); | ||
| this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger); | ||
| this.clusterApplier = clusterApplier; | ||
| masterService.setClusterStateSupplier(this::getStateForMasterService); | ||
| } | ||
|
|
||
|
|
@@ -167,13 +178,31 @@ private void onFollowerCheckRequest(FollowerCheckRequest followerCheckRequest) { | |
| } | ||
| } | ||
|
|
||
| private void handleApplyCommit(ApplyCommitRequest applyCommitRequest) { | ||
| private void handleApplyCommit(ApplyCommitRequest applyCommitRequest, ActionListener<Void> applyListener) { | ||
| synchronized (mutex) { | ||
| logger.trace("handleApplyCommit: applying commit {}", applyCommitRequest); | ||
|
|
||
| coordinationState.get().handleCommit(applyCommitRequest); | ||
| lastCommittedState = Optional.of(coordinationState.get().getLastAcceptedState()); | ||
| // TODO: send to applier | ||
| applierState = mode == Mode.CANDIDATE ? clusterStateWithNoMasterBlock(lastCommittedState.get()) : lastCommittedState.get(); | ||
| if (applyCommitRequest.getSourceNode().equals(getLocalNode())) { | ||
| // master node applies the committed state at the end of the publication process, not here. | ||
| applyListener.onResponse(null); | ||
| } else { | ||
| clusterApplier.onNewClusterState(applyCommitRequest.toString(), () -> applierState, | ||
| new ClusterApplyListener() { | ||
|
|
||
| @Override | ||
| public void onFailure(String source, Exception e) { | ||
| applyListener.onFailure(e); | ||
| } | ||
|
|
||
| @Override | ||
| public void onSuccess(String source) { | ||
| applyListener.onResponse(null); | ||
| } | ||
| }); | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -299,6 +328,12 @@ void becomeCandidate(String method) { | |
|
|
||
| followersChecker.clearCurrentNodes(); | ||
| followersChecker.updateFastResponseState(getCurrentTerm(), mode); | ||
|
|
||
| if (applierState.nodes().getMasterNodeId() != null) { | ||
| applierState = clusterStateWithNoMasterBlock(applierState); | ||
| clusterApplier.onNewClusterState("becoming candidate: " + method, () -> applierState, (source, e) -> { | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| preVoteCollector.update(getPreVoteResponse(), null); | ||
|
|
@@ -385,10 +420,20 @@ boolean publicationInProgress() { | |
|
|
||
| @Override | ||
| protected void doStart() { | ||
| CoordinationState.PersistedState persistedState = persistedStateSupplier.get(); | ||
| coordinationState.set(new CoordinationState(settings, getLocalNode(), persistedState)); | ||
| peerFinder.setCurrentTerm(getCurrentTerm()); | ||
| configuredHostsResolver.start(); | ||
| synchronized (mutex) { | ||
| CoordinationState.PersistedState persistedState = persistedStateSupplier.get(); | ||
| coordinationState.set(new CoordinationState(settings, getLocalNode(), persistedState)); | ||
| peerFinder.setCurrentTerm(getCurrentTerm()); | ||
| configuredHostsResolver.start(); | ||
| ClusterState initialState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)) | ||
| .blocks(ClusterBlocks.builder() | ||
| .addGlobalBlock(STATE_NOT_RECOVERED_BLOCK) | ||
| .addGlobalBlock(NO_MASTER_BLOCK_WRITES)) | ||
DaveCTurner marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| .nodes(DiscoveryNodes.builder().add(getLocalNode()).localNodeId(getLocalNode().getId())) | ||
| .build(); | ||
| applierState = initialState; | ||
| clusterApplier.setInitialState(initialState); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -419,6 +464,13 @@ public void invariant() { | |
| assert peerFinder.getCurrentTerm() == getCurrentTerm(); | ||
| assert followersChecker.getFastResponseState().term == getCurrentTerm() : followersChecker.getFastResponseState(); | ||
| assert followersChecker.getFastResponseState().mode == getMode() : followersChecker.getFastResponseState(); | ||
| if (lastCommittedState.isPresent()) { | ||
| assert applierState != null; | ||
| assert lastCommittedState.get().term() == applierState.term(); | ||
| assert lastCommittedState.get().version() == applierState.version(); | ||
| } | ||
| assert mode != Mode.CANDIDATE || applierState.nodes().getMasterNodeId() == null; | ||
DaveCTurner marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| assert (applierState.nodes().getMasterNodeId() == null) == applierState.blocks().hasGlobalBlock(NO_MASTER_BLOCK_WRITES.id()); | ||
| if (mode == Mode.LEADER) { | ||
| final boolean becomingMaster = getStateForMasterService().term() != getCurrentTerm(); | ||
|
|
||
|
|
@@ -433,7 +485,8 @@ public void invariant() { | |
|
|
||
| final Set<DiscoveryNode> knownFollowers = followersChecker.getKnownFollowers(); | ||
| final Set<DiscoveryNode> lastPublishedNodes = new HashSet<>(); | ||
| if (becomingMaster == false || publicationInProgress()) { | ||
| if (becomingMaster == false || | ||
| (publicationInProgress() && getCurrentTerm() == currentPublication.get().publishedState().term())) { | ||
|
||
| final ClusterState lastPublishedState | ||
| = currentPublication.map(Publication::publishedState).orElse(coordinationState.get().getLastAcceptedState()); | ||
| lastPublishedState.nodes().forEach(lastPublishedNodes::add); | ||
|
|
@@ -524,7 +577,7 @@ private ClusterState clusterStateWithNoMasterBlock(ClusterState clusterState) { | |
| "NO_MASTER_BLOCK should only be added by Coordinator"; | ||
| // TODO: allow dynamically configuring NO_MASTER_BLOCK_ALL | ||
| final ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(clusterState.blocks()).addGlobalBlock( | ||
| DiscoverySettings.NO_MASTER_BLOCK_WRITES).build(); | ||
| NO_MASTER_BLOCK_WRITES).build(); | ||
| final DiscoveryNodes discoveryNodes = new DiscoveryNodes.Builder(clusterState.nodes()).masterNodeId(null).build(); | ||
| return ClusterState.builder(clusterState).blocks(clusterBlocks).nodes(discoveryNodes).build(); | ||
| } else { | ||
|
|
@@ -593,42 +646,68 @@ public void onNodeAck(DiscoveryNode node, Exception e) { | |
| final Publication publication = new Publication(settings, publishRequest, wrappedAckListener, | ||
| transportService.getThreadPool()::relativeTimeInMillis) { | ||
|
|
||
| final Publication thisPublication = this; | ||
|
|
||
| private void failPublicationAndPossiblyBecomeCandidate(String reason) { | ||
| assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; | ||
|
|
||
| assert currentPublication.get() == this; | ||
| currentPublication = Optional.empty(); | ||
|
|
||
| // check if node has not already switched modes (by bumping term) | ||
| if (mode == Mode.LEADER && publishRequest.getAcceptedState().term() == getCurrentTerm()) { | ||
| becomeCandidate(reason); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| protected void onCompletion(boolean committed) { | ||
| assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; | ||
| assert currentPublication.get() == this; | ||
DaveCTurner marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| currentPublication = Optional.empty(); | ||
| updateMaxTermSeen(getCurrentTerm()); // triggers term bump if new term was found during publication | ||
|
|
||
| localNodeAckEvent.addListener(new ActionListener<Void>() { | ||
| @Override | ||
| public void onResponse(Void ignore) { | ||
| assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; | ||
| assert coordinationState.get().getLastAcceptedTerm() == publishRequest.getAcceptedState().term() | ||
| && coordinationState.get().getLastAcceptedVersion() == publishRequest.getAcceptedState().version() | ||
| : "onPossibleCompletion: term or version mismatch when publishing [" + this | ||
| + "]: current version is now [" + coordinationState.get().getLastAcceptedVersion() | ||
| + "] in term [" + coordinationState.get().getLastAcceptedTerm() + "]"; | ||
| assert committed; | ||
|
|
||
| // TODO: send to applier | ||
| ackListener.onNodeAck(getLocalNode(), null); | ||
| publishListener.onResponse(null); | ||
| clusterApplier.onNewClusterState(thisPublication.toString(), () -> applierState, | ||
| new ClusterApplyListener() { | ||
| @Override | ||
| public void onFailure(String source, Exception e) { | ||
| synchronized (mutex) { | ||
| failPublicationAndPossiblyBecomeCandidate("clusterApplier#onNewClusterState"); | ||
| } | ||
| ackListener.onNodeAck(getLocalNode(), e); | ||
| publishListener.onFailure(e); | ||
| } | ||
|
|
||
| @Override | ||
| public void onSuccess(String source) { | ||
| synchronized (mutex) { | ||
| assert currentPublication.get() == thisPublication; | ||
| currentPublication = Optional.empty(); | ||
DaveCTurner marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| // trigger term bump if new term was found during publication | ||
| updateMaxTermSeen(getCurrentTerm()); | ||
| } | ||
|
|
||
| ackListener.onNodeAck(getLocalNode(), null); | ||
| publishListener.onResponse(null); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| @Override | ||
| public void onFailure(Exception e) { | ||
| assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; | ||
| if (publishRequest.getAcceptedState().term() == coordinationState.get().getCurrentTerm() && | ||
| publishRequest.getAcceptedState().version() == coordinationState.get().getLastPublishedVersion()) { | ||
| becomeCandidate("Publication.onCompletion(false)"); | ||
| } | ||
| failPublicationAndPossiblyBecomeCandidate("Publication.onCompletion(false)"); | ||
|
|
||
| FailedToCommitClusterStateException exception = new FailedToCommitClusterStateException( | ||
| "publication failed", e); | ||
| ackListener.onNodeAck(getLocalNode(), exception); // other nodes have acked, but not the master. | ||
| publishListener.onFailure(exception); | ||
| } | ||
| }, transportService.getThreadPool().generic()); | ||
| }, EsExecutors.newDirectExecutorService()); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -667,8 +746,6 @@ protected void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest app | |
| } | ||
| }; | ||
|
|
||
| assert currentPublication.isPresent() == false | ||
| : "[" + currentPublication.get() + "] in progress, cannot start [" + publication + ']'; | ||
| currentPublication = Optional.of(publication); | ||
|
|
||
| transportService.getThreadPool().schedule(publishTimeout, Names.GENERIC, new Runnable() { | ||
|
|
@@ -717,7 +794,6 @@ private void cancelActivePublication() { | |
| assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; | ||
| if (currentPublication.isPresent()) { | ||
| currentPublication.get().onTimeout(); | ||
| assert currentPublication.isPresent() == false; | ||
| } | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.