Skip to content

Commit 6d6ac74

Browse files
authored
Zen2: Fail fast on disconnects (#34503)
Integrates the failure detectors with the Connection lifecycle, to fail nodes as soon as: - a leader detects one of his followers disconnecting. - a follower detects its leader disconnecting.
1 parent bfd24fc commit 6d6ac74

File tree

7 files changed

+271
-99
lines changed

7 files changed

+271
-99
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

Lines changed: 9 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
107107
private Releasable electionScheduler;
108108
@Nullable
109109
private Releasable prevotingRound;
110-
@Nullable
111-
private Releasable leaderCheckScheduler;
112110
private long maxTermSeen;
113111
private final Reconfigurator reconfigurator;
114112

@@ -140,7 +138,7 @@ public Coordinator(Settings settings, ClusterSettings clusterSettings, Transport
140138
this.publicationHandler = new PublicationTransportHandler(settings, transportService, this::handlePublishRequest,
141139
this::handleApplyCommit);
142140
this.leaderChecker = new LeaderChecker(settings, transportService, getOnLeaderFailure());
143-
this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::onFollowerFailure);
141+
this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::removeNode);
144142
this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
145143
this.clusterApplier = clusterApplier;
146144
masterService.setClusterStateSupplier(this::getStateForMasterService);
@@ -163,11 +161,11 @@ public String toString() {
163161
};
164162
}
165163

166-
private void onFollowerFailure(DiscoveryNode discoveryNode) {
164+
private void removeNode(DiscoveryNode discoveryNode, String reason) {
167165
synchronized (mutex) {
168166
if (mode == Mode.LEADER) {
169167
masterService.submitStateUpdateTask("node-left",
170-
new NodeRemovalClusterStateTaskExecutor.Task(discoveryNode, "node left"),
168+
new NodeRemovalClusterStateTaskExecutor.Task(discoveryNode, reason),
171169
ClusterStateTaskConfig.build(Priority.IMMEDIATE),
172170
nodeRemovalExecutor,
173171
nodeRemovalExecutor);
@@ -358,11 +356,7 @@ void becomeCandidate(String method) {
358356

359357
peerFinder.activate(coordinationState.get().getLastAcceptedState().nodes());
360358
leaderChecker.setCurrentNodes(DiscoveryNodes.EMPTY_NODES);
361-
362-
if (leaderCheckScheduler != null) {
363-
leaderCheckScheduler.close();
364-
leaderCheckScheduler = null;
365-
}
359+
leaderChecker.updateLeader(null);
366360

367361
followersChecker.clearCurrentNodes();
368362
followersChecker.updateFastResponseState(getCurrentTerm(), mode);
@@ -391,7 +385,7 @@ void becomeLeader(String method) {
391385
closePrevotingAndElectionScheduler();
392386
preVoteCollector.update(getPreVoteResponse(), getLocalNode());
393387

394-
assert leaderCheckScheduler == null : leaderCheckScheduler;
388+
assert leaderChecker.leader() == null : leaderChecker.leader();
395389
followersChecker.updateFastResponseState(getCurrentTerm(), mode);
396390
}
397391

@@ -415,10 +409,7 @@ void becomeFollower(String method, DiscoveryNode leaderNode) {
415409
preVoteCollector.update(getPreVoteResponse(), leaderNode);
416410

417411
if (restartLeaderChecker) {
418-
if (leaderCheckScheduler != null) {
419-
leaderCheckScheduler.close();
420-
}
421-
leaderCheckScheduler = leaderChecker.startLeaderChecker(leaderNode);
412+
leaderChecker.updateLeader(leaderNode);
422413
}
423414

424415
followersChecker.clearCurrentNodes();
@@ -515,7 +506,7 @@ public void invariant() {
515506
assert electionScheduler == null : electionScheduler;
516507
assert prevotingRound == null : prevotingRound;
517508
assert becomingMaster || getStateForMasterService().nodes().getMasterNodeId() != null : getStateForMasterService();
518-
assert leaderCheckScheduler == null : leaderCheckScheduler;
509+
assert leaderChecker.leader() == null : leaderChecker.leader();
519510
assert applierState.nodes().getMasterNodeId() == null || getLocalNode().equals(applierState.nodes().getMasterNode());
520511
assert preVoteCollector.getLeader() == getLocalNode() : preVoteCollector;
521512

@@ -553,7 +544,7 @@ public void invariant() {
553544
assert prevotingRound == null : prevotingRound;
554545
assert getStateForMasterService().nodes().getMasterNodeId() == null : getStateForMasterService();
555546
assert leaderChecker.currentNodeIsMaster() == false;
556-
assert leaderCheckScheduler != null;
547+
assert lastKnownLeader.equals(Optional.of(leaderChecker.leader()));
557548
assert followersChecker.getKnownFollowers().isEmpty();
558549
assert currentPublication.map(Publication::isCommitted).orElse(true);
559550
assert preVoteCollector.getLeader().equals(lastKnownLeader.get()) : preVoteCollector;
@@ -564,7 +555,7 @@ public void invariant() {
564555
assert prevotingRound == null || electionScheduler != null;
565556
assert getStateForMasterService().nodes().getMasterNodeId() == null : getStateForMasterService();
566557
assert leaderChecker.currentNodeIsMaster() == false;
567-
assert leaderCheckScheduler == null : leaderCheckScheduler;
558+
assert leaderChecker.leader() == null : leaderChecker.leader();
568559
assert followersChecker.getKnownFollowers().isEmpty();
569560
assert applierState.nodes().getMasterNodeId() == null;
570561
assert currentPublication.map(Publication::isCommitted).orElse(true);

server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java

Lines changed: 45 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.threadpool.ThreadPool.Names;
3434
import org.elasticsearch.transport.ConnectTransportException;
3535
import org.elasticsearch.transport.TransportChannel;
36+
import org.elasticsearch.transport.TransportConnectionListener;
3637
import org.elasticsearch.transport.TransportException;
3738
import org.elasticsearch.transport.TransportRequest;
3839
import org.elasticsearch.transport.TransportRequestOptions;
@@ -46,6 +47,7 @@
4647
import java.util.Map;
4748
import java.util.Objects;
4849
import java.util.Set;
50+
import java.util.function.BiConsumer;
4951
import java.util.function.Consumer;
5052
import java.util.function.Predicate;
5153

@@ -78,7 +80,7 @@ public class FollowersChecker extends AbstractComponent {
7880
private final TimeValue followerCheckInterval;
7981
private final TimeValue followerCheckTimeout;
8082
private final int followerCheckRetryCount;
81-
private final Consumer<DiscoveryNode> onNodeFailure;
83+
private final BiConsumer<DiscoveryNode, String> onNodeFailure;
8284
private final Consumer<FollowerCheckRequest> handleRequestAndUpdateState;
8385

8486
private final Object mutex = new Object(); // protects writes to this state; read access does not need sync
@@ -91,7 +93,7 @@ public class FollowersChecker extends AbstractComponent {
9193

9294
public FollowersChecker(Settings settings, TransportService transportService,
9395
Consumer<FollowerCheckRequest> handleRequestAndUpdateState,
94-
Consumer<DiscoveryNode> onNodeFailure) {
96+
BiConsumer<DiscoveryNode, String> onNodeFailure) {
9597
super(settings);
9698
this.transportService = transportService;
9799
this.handleRequestAndUpdateState = handleRequestAndUpdateState;
@@ -104,6 +106,12 @@ public FollowersChecker(Settings settings, TransportService transportService,
104106
updateFastResponseState(0, Mode.CANDIDATE);
105107
transportService.registerRequestHandler(FOLLOWER_CHECK_ACTION_NAME, Names.SAME, FollowerCheckRequest::new,
106108
(request, transportChannel, task) -> handleFollowerCheck(request, transportChannel));
109+
transportService.addConnectionListener(new TransportConnectionListener() {
110+
@Override
111+
public void onNodeDisconnected(DiscoveryNode node) {
112+
handleDisconnectedNode(node);
113+
}
114+
});
107115
}
108116

109117
/**
@@ -228,6 +236,15 @@ Set<DiscoveryNode> getKnownFollowers() {
228236
}
229237
}
230238

239+
private void handleDisconnectedNode(DiscoveryNode discoveryNode) {
240+
synchronized (mutex) {
241+
FollowerChecker followerChecker = followerCheckers.get(discoveryNode);
242+
if (followerChecker != null && followerChecker.running()) {
243+
followerChecker.failNode("disconnected");
244+
}
245+
}
246+
}
247+
231248
static class FastResponseState {
232249
final long term;
233250
final Mode mode;
@@ -303,36 +320,21 @@ public void handleException(TransportException exp) {
303320

304321
failureCountSinceLastSuccess++;
305322

323+
final String reason;
306324
if (failureCountSinceLastSuccess >= followerCheckRetryCount) {
307325
logger.debug(() -> new ParameterizedMessage("{} failed too many times", FollowerChecker.this), exp);
326+
reason = "followers check retry count exceeded";
308327
} else if (exp instanceof ConnectTransportException
309328
|| exp.getCause() instanceof ConnectTransportException) {
310329
logger.debug(() -> new ParameterizedMessage("{} disconnected", FollowerChecker.this), exp);
330+
reason = "disconnected";
311331
} else {
312332
logger.debug(() -> new ParameterizedMessage("{} failed, retrying", FollowerChecker.this), exp);
313333
scheduleNextWakeUp();
314334
return;
315335
}
316336

317-
transportService.getThreadPool().generic().execute(new Runnable() {
318-
@Override
319-
public void run() {
320-
synchronized (mutex) {
321-
if (running() == false) {
322-
logger.debug("{} no longer running, not marking faulty", FollowerChecker.this);
323-
return;
324-
}
325-
faultyNodes.add(discoveryNode);
326-
followerCheckers.remove(discoveryNode);
327-
}
328-
onNodeFailure.accept(discoveryNode);
329-
}
330-
331-
@Override
332-
public String toString() {
333-
return "detected failure of " + discoveryNode;
334-
}
335-
});
337+
failNode(reason);
336338
}
337339

338340

@@ -343,6 +345,28 @@ public String executor() {
343345
});
344346
}
345347

348+
void failNode(String reason) {
349+
transportService.getThreadPool().generic().execute(new Runnable() {
350+
@Override
351+
public void run() {
352+
synchronized (mutex) {
353+
if (running() == false) {
354+
logger.debug("{} condition no longer applies, not marking faulty", discoveryNode);
355+
return;
356+
}
357+
faultyNodes.add(discoveryNode);
358+
followerCheckers.remove(discoveryNode);
359+
}
360+
onNodeFailure.accept(discoveryNode, reason);
361+
}
362+
363+
@Override
364+
public String toString() {
365+
return "detected failure of " + discoveryNode;
366+
}
367+
});
368+
}
369+
346370
private void scheduleNextWakeUp() {
347371
transportService.getThreadPool().schedule(followerCheckInterval, Names.SAME, new Runnable() {
348372
@Override

server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.logging.log4j.message.ParameterizedMessage;
2323
import org.elasticsearch.cluster.node.DiscoveryNode;
2424
import org.elasticsearch.cluster.node.DiscoveryNodes;
25+
import org.elasticsearch.common.Nullable;
2526
import org.elasticsearch.common.component.AbstractComponent;
2627
import org.elasticsearch.common.io.stream.StreamInput;
2728
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -33,6 +34,7 @@
3334
import org.elasticsearch.threadpool.ThreadPool.Names;
3435
import org.elasticsearch.transport.ConnectTransportException;
3536
import org.elasticsearch.transport.TransportChannel;
37+
import org.elasticsearch.transport.TransportConnectionListener;
3638
import org.elasticsearch.transport.TransportException;
3739
import org.elasticsearch.transport.TransportRequest;
3840
import org.elasticsearch.transport.TransportRequestOptions;
@@ -46,6 +48,7 @@
4648
import java.util.Objects;
4749
import java.util.concurrent.atomic.AtomicBoolean;
4850
import java.util.concurrent.atomic.AtomicLong;
51+
import java.util.concurrent.atomic.AtomicReference;
4952

5053
/**
5154
* The LeaderChecker is responsible for allowing followers to check that the currently elected leader is still connected and healthy. We are
@@ -77,6 +80,8 @@ public class LeaderChecker extends AbstractComponent {
7780
private final TransportService transportService;
7881
private final Runnable onLeaderFailure;
7982

83+
private AtomicReference<CheckScheduler> currentChecker = new AtomicReference<>();
84+
8085
private volatile DiscoveryNodes discoveryNodes;
8186

8287
public LeaderChecker(final Settings settings, final TransportService transportService, final Runnable onLeaderFailure) {
@@ -88,19 +93,39 @@ public LeaderChecker(final Settings settings, final TransportService transportSe
8893
this.onLeaderFailure = onLeaderFailure;
8994

9095
transportService.registerRequestHandler(LEADER_CHECK_ACTION_NAME, Names.SAME, LeaderCheckRequest::new, this::handleLeaderCheck);
96+
transportService.addConnectionListener(new TransportConnectionListener() {
97+
@Override
98+
public void onNodeDisconnected(DiscoveryNode node) {
99+
handleDisconnectedNode(node);
100+
}
101+
});
102+
}
103+
104+
public DiscoveryNode leader() {
105+
CheckScheduler checkScheduler = currentChecker.get();
106+
return checkScheduler == null ? null : checkScheduler.leader;
91107
}
92108

93109
/**
94-
* Start a leader checker for the given leader. Should only be called after successfully joining this leader.
110+
* Starts and / or stops a leader checker for the given leader. Should only be called after successfully joining this leader.
95111
*
96-
* @param leader the node to be checked as leader
97-
* @return a `Releasable` that can be used to stop this checker.
112+
* @param leader the node to be checked as leader, or null if checks should be disabled
98113
*/
99-
public Releasable startLeaderChecker(final DiscoveryNode leader) {
114+
public void updateLeader(@Nullable final DiscoveryNode leader) {
100115
assert transportService.getLocalNode().equals(leader) == false;
101-
CheckScheduler checkScheduler = new CheckScheduler(leader);
102-
checkScheduler.handleWakeUp();
103-
return checkScheduler;
116+
final CheckScheduler checkScheduler;
117+
if (leader != null) {
118+
checkScheduler = new CheckScheduler(leader);
119+
} else {
120+
checkScheduler = null;
121+
}
122+
CheckScheduler previousChecker = currentChecker.getAndSet(checkScheduler);
123+
if (previousChecker != null) {
124+
previousChecker.close();
125+
}
126+
if (checkScheduler != null) {
127+
checkScheduler.handleWakeUp();
128+
}
104129
}
105130

106131
/**
@@ -137,6 +162,15 @@ private void handleLeaderCheck(LeaderCheckRequest request, TransportChannel tran
137162
}
138163
}
139164

165+
private void handleDisconnectedNode(DiscoveryNode discoveryNode) {
166+
CheckScheduler checkScheduler = currentChecker.get();
167+
if (checkScheduler != null) {
168+
checkScheduler.handleDisconnectedNode(discoveryNode);
169+
} else {
170+
logger.trace("disconnect event ignored for {}, no check scheduler", discoveryNode);
171+
}
172+
}
173+
140174
private class CheckScheduler implements Releasable {
141175

142176
private final AtomicBoolean isClosed = new AtomicBoolean();
@@ -222,14 +256,20 @@ public String executor() {
222256
});
223257
}
224258

225-
private void leaderFailed() {
259+
void leaderFailed() {
226260
if (isClosed.compareAndSet(false, true)) {
227261
transportService.getThreadPool().generic().execute(onLeaderFailure);
228262
} else {
229263
logger.debug("already closed, not failing leader");
230264
}
231265
}
232266

267+
void handleDisconnectedNode(DiscoveryNode discoveryNode) {
268+
if (discoveryNode.equals(leader)) {
269+
leaderFailed();
270+
}
271+
}
272+
233273
private void scheduleNextWakeUp() {
234274
logger.trace("scheduling next check of {} for [{}] = {}", leader, LEADER_CHECK_INTERVAL_SETTING.getKey(), leaderCheckInterval);
235275
transportService.getThreadPool().schedule(leaderCheckInterval, Names.SAME, new Runnable() {

0 commit comments

Comments
 (0)