Skip to content

Commit a68a464

Browse files
authored
[Zen2] Add lag detector (#35685)
A publication can succeed and complete before all nodes have applied the published state and acknowledged it, thanks to the publication timeout; however we need every node eventually either to apply the published state (or a later state) or be removed from the cluster. This change introduces the LagDetector which achieves this liveness property by removing any lagging nodes from the cluster.
1 parent f47636b commit a68a464

File tree

6 files changed

+436
-38
lines changed

6 files changed

+436
-38
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
119119
private long maxTermSeen;
120120
private final Reconfigurator reconfigurator;
121121
private final ClusterBootstrapService clusterBootstrapService;
122+
private final LagDetector lagDetector;
122123

123124
private Mode mode;
124125
private Optional<DiscoveryNode> lastKnownLeader;
@@ -157,6 +158,8 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
157158
masterService.setClusterStateSupplier(this::getStateForMasterService);
158159
this.reconfigurator = new Reconfigurator(settings, clusterSettings);
159160
this.clusterBootstrapService = new ClusterBootstrapService(settings, transportService);
161+
this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"),
162+
transportService::getLocalNode);
160163
}
161164

162165
private Runnable getOnLeaderFailure() {
@@ -374,6 +377,7 @@ void becomeCandidate(String method) {
374377

375378
followersChecker.clearCurrentNodes();
376379
followersChecker.updateFastResponseState(getCurrentTerm(), mode);
380+
lagDetector.clearTrackedNodes();
377381

378382
if (applierState.nodes().getMasterNodeId() != null) {
379383
applierState = clusterStateWithNoMasterBlock(applierState);
@@ -428,6 +432,7 @@ void becomeFollower(String method, DiscoveryNode leaderNode) {
428432

429433
followersChecker.clearCurrentNodes();
430434
followersChecker.updateFastResponseState(getCurrentTerm(), mode);
435+
lagDetector.clearTrackedNodes();
431436
}
432437

433438
private PreVoteResponse getPreVoteResponse() {
@@ -512,6 +517,11 @@ public void invariant() {
512517
assert (applierState.nodes().getMasterNodeId() == null) == applierState.blocks().hasGlobalBlock(NO_MASTER_BLOCK_WRITES.id());
513518
assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse())
514519
: preVoteCollector + " vs " + getPreVoteResponse();
520+
521+
assert lagDetector.getTrackedNodes().contains(getLocalNode()) == false : lagDetector.getTrackedNodes();
522+
assert followersChecker.getKnownFollowers().equals(lagDetector.getTrackedNodes())
523+
: followersChecker.getKnownFollowers() + " vs " + lagDetector.getTrackedNodes();
524+
515525
if (mode == Mode.LEADER) {
516526
final boolean becomingMaster = getStateForMasterService().term() != getCurrentTerm();
517527

@@ -831,8 +841,10 @@ public String toString() {
831841
}
832842
});
833843

834-
leaderChecker.setCurrentNodes(publishRequest.getAcceptedState().nodes());
835-
followersChecker.setCurrentNodes(publishRequest.getAcceptedState().nodes());
844+
final DiscoveryNodes publishNodes = publishRequest.getAcceptedState().nodes();
845+
leaderChecker.setCurrentNodes(publishNodes);
846+
followersChecker.setCurrentNodes(publishNodes);
847+
lagDetector.setTrackedNodes(publishNodes);
836848
publication.start(followersChecker.getFaultyNodes());
837849
}
838850
} catch (Exception e) {
@@ -985,6 +997,9 @@ public void onNodeAck(DiscoveryNode node, Exception e) {
985997
}
986998
} else {
987999
ackListener.onNodeAck(node, e);
1000+
if (e == null) {
1001+
lagDetector.setAppliedVersion(node, publishRequest.getAcceptedState().version());
1002+
}
9881003
}
9891004
}
9901005
},
@@ -1051,6 +1066,7 @@ public void onSuccess(String source) {
10511066
if (mode == Mode.LEADER) {
10521067
scheduleReconfigurationIfNeeded();
10531068
}
1069+
lagDetector.startLagDetector(publishRequest.getAcceptedState().version());
10541070
}
10551071
ackListener.onNodeAck(getLocalNode(), null);
10561072
publishListener.onResponse(null);
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.cluster.coordination;
20+
21+
import org.apache.logging.log4j.LogManager;
22+
import org.apache.logging.log4j.Logger;
23+
import org.elasticsearch.cluster.node.DiscoveryNode;
24+
import org.elasticsearch.common.settings.Setting;
25+
import org.elasticsearch.common.settings.Settings;
26+
import org.elasticsearch.common.unit.TimeValue;
27+
import org.elasticsearch.threadpool.ThreadPool;
28+
import org.elasticsearch.threadpool.ThreadPool.Names;
29+
30+
import java.util.Collections;
31+
import java.util.HashSet;
32+
import java.util.List;
33+
import java.util.Map;
34+
import java.util.Set;
35+
import java.util.concurrent.atomic.AtomicLong;
36+
import java.util.function.Consumer;
37+
import java.util.function.Supplier;
38+
import java.util.stream.Collectors;
39+
40+
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
41+
42+
/**
43+
* A publication can succeed and complete before all nodes have applied the published state and acknowledged it; however we need every node
44+
* eventually either to apply the published state (or a later state) or be removed from the cluster. This component achieves this by
45+
* removing any lagging nodes from the cluster after a timeout.
46+
*/
47+
public class LagDetector {
48+
49+
private static final Logger logger = LogManager.getLogger(LagDetector.class);
50+
51+
// the timeout for each node to apply a cluster state update after the leader has applied it, before being removed from the cluster
52+
public static final Setting<TimeValue> CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING =
53+
Setting.timeSetting("cluster.follower_lag.timeout",
54+
TimeValue.timeValueMillis(90000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);
55+
56+
private final TimeValue clusterStateApplicationTimeout;
57+
private final Consumer<DiscoveryNode> onLagDetected;
58+
private final Supplier<DiscoveryNode> localNodeSupplier;
59+
private final ThreadPool threadPool;
60+
private final Map<DiscoveryNode, NodeAppliedStateTracker> appliedStateTrackersByNode = newConcurrentMap();
61+
62+
public LagDetector(final Settings settings, final ThreadPool threadPool, final Consumer<DiscoveryNode> onLagDetected,
63+
final Supplier<DiscoveryNode> localNodeSupplier) {
64+
this.threadPool = threadPool;
65+
this.clusterStateApplicationTimeout = CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING.get(settings);
66+
this.onLagDetected = onLagDetected;
67+
this.localNodeSupplier = localNodeSupplier;
68+
}
69+
70+
public void setTrackedNodes(final Iterable<DiscoveryNode> discoveryNodes) {
71+
final Set<DiscoveryNode> discoveryNodeSet = new HashSet<>();
72+
discoveryNodes.forEach(discoveryNodeSet::add);
73+
discoveryNodeSet.remove(localNodeSupplier.get());
74+
appliedStateTrackersByNode.keySet().retainAll(discoveryNodeSet);
75+
discoveryNodeSet.forEach(node -> appliedStateTrackersByNode.putIfAbsent(node, new NodeAppliedStateTracker(node)));
76+
}
77+
78+
public void clearTrackedNodes() {
79+
appliedStateTrackersByNode.clear();
80+
}
81+
82+
public void setAppliedVersion(final DiscoveryNode discoveryNode, final long appliedVersion) {
83+
final NodeAppliedStateTracker nodeAppliedStateTracker = appliedStateTrackersByNode.get(discoveryNode);
84+
if (nodeAppliedStateTracker == null) {
85+
// Received an ack from a node that a later publication has removed (or we are no longer master). No big deal.
86+
logger.trace("node {} applied version {} but this node's version is not being tracked", discoveryNode, appliedVersion);
87+
} else {
88+
nodeAppliedStateTracker.increaseAppliedVersion(appliedVersion);
89+
}
90+
}
91+
92+
public void startLagDetector(final long version) {
93+
final List<NodeAppliedStateTracker> laggingTrackers
94+
= appliedStateTrackersByNode.values().stream().filter(t -> t.appliedVersionLessThan(version)).collect(Collectors.toList());
95+
96+
if (laggingTrackers.isEmpty()) {
97+
logger.trace("lag detection for version {} is unnecessary: {}", version, appliedStateTrackersByNode.values());
98+
} else {
99+
logger.debug("starting lag detector for version {}: {}", version, laggingTrackers);
100+
101+
threadPool.scheduleUnlessShuttingDown(clusterStateApplicationTimeout, Names.GENERIC, new Runnable() {
102+
@Override
103+
public void run() {
104+
laggingTrackers.forEach(t -> t.checkForLag(version));
105+
}
106+
107+
@Override
108+
public String toString() {
109+
return "lag detector for version " + version + " on " + laggingTrackers;
110+
}
111+
});
112+
}
113+
}
114+
115+
@Override
116+
public String toString() {
117+
return "LagDetector{" +
118+
"clusterStateApplicationTimeout=" + clusterStateApplicationTimeout +
119+
", appliedStateTrackersByNode=" + appliedStateTrackersByNode.values() +
120+
'}';
121+
}
122+
123+
// for assertions
124+
Set<DiscoveryNode> getTrackedNodes() {
125+
return Collections.unmodifiableSet(appliedStateTrackersByNode.keySet());
126+
}
127+
128+
private class NodeAppliedStateTracker {
129+
private final DiscoveryNode discoveryNode;
130+
private final AtomicLong appliedVersion = new AtomicLong();
131+
132+
NodeAppliedStateTracker(final DiscoveryNode discoveryNode) {
133+
this.discoveryNode = discoveryNode;
134+
}
135+
136+
void increaseAppliedVersion(long appliedVersion) {
137+
long maxAppliedVersion = this.appliedVersion.updateAndGet(v -> Math.max(v, appliedVersion));
138+
logger.trace("{} applied version {}, max now {}", this, appliedVersion, maxAppliedVersion);
139+
}
140+
141+
boolean appliedVersionLessThan(final long version) {
142+
return appliedVersion.get() < version;
143+
}
144+
145+
@Override
146+
public String toString() {
147+
return "NodeAppliedStateTracker{" +
148+
"discoveryNode=" + discoveryNode +
149+
", appliedVersion=" + appliedVersion +
150+
'}';
151+
}
152+
153+
void checkForLag(final long version) {
154+
if (appliedStateTrackersByNode.get(discoveryNode) != this) {
155+
logger.trace("{} no longer active when checking version {}", this, version);
156+
return;
157+
}
158+
159+
long appliedVersion = this.appliedVersion.get();
160+
if (version <= appliedVersion) {
161+
logger.trace("{} satisfied when checking version {}, node applied version {}", this, version, appliedVersion);
162+
return;
163+
}
164+
165+
logger.debug("{}, detected lag at version {}, node has only applied version {}", this, version, appliedVersion);
166+
onLagDetected.accept(discoveryNode);
167+
}
168+
}
169+
}

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.cluster.NodeConnectionsService;
3434
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
3535
import org.elasticsearch.cluster.coordination.ClusterBootstrapService;
36+
import org.elasticsearch.cluster.coordination.LagDetector;
3637
import org.elasticsearch.cluster.coordination.Coordinator;
3738
import org.elasticsearch.cluster.coordination.ElectionSchedulerFactory;
3839
import org.elasticsearch.cluster.coordination.FollowersChecker;
@@ -469,7 +470,8 @@ public void apply(Settings value, Settings current, Settings previous) {
469470
LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING,
470471
Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION,
471472
TransportAddVotingTombstonesAction.MAXIMUM_VOTING_TOMBSTONES_SETTING,
472-
ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING
473+
ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING,
474+
LagDetector.CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING
473475
)));
474476

475477
public static List<SettingUpgrader<?>> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList(

server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java

Lines changed: 1 addition & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -599,7 +599,7 @@ public void testAckListenerReceivesNoAckFromHangingFollower() {
599599

600600
assertTrue("expected immediate ack from " + follower1, ackCollector.hasAckedSuccessfully(follower1));
601601
assertFalse("expected no ack from " + leader, ackCollector.hasAckedSuccessfully(leader));
602-
cluster.stabilise();
602+
cluster.stabilise(defaultMillis(PUBLISH_TIMEOUT_SETTING));
603603
assertTrue("expected eventual ack from " + leader, ackCollector.hasAckedSuccessfully(leader));
604604
assertFalse("expected no ack from " + follower0, ackCollector.hasAcked(follower0));
605605
}
@@ -1101,7 +1101,6 @@ void stabilise(long stabilisationDurationMillis) {
11011101
}
11021102

11031103
runFor(stabilisationDurationMillis, "stabilising");
1104-
fixLag();
11051104

11061105
final ClusterNode leader = getAnyLeader();
11071106
final long leaderTerm = leader.coordinator.getCurrentTerm();
@@ -1158,35 +1157,6 @@ void stabilise(long stabilisationDurationMillis) {
11581157
leader.improveConfiguration(lastAcceptedState), sameInstance(lastAcceptedState));
11591158
}
11601159

1161-
// TODO remove this when lag detection is implemented
1162-
void fixLag() {
1163-
final ClusterNode leader = getAnyLeader();
1164-
final long leaderVersion = leader.getLastAppliedClusterState().version();
1165-
final long minVersion = clusterNodes.stream()
1166-
.filter(n -> isConnectedPair(n, leader))
1167-
.map(n -> n.getLastAppliedClusterState().version()).min(Long::compare).orElse(Long.MIN_VALUE);
1168-
assert minVersion >= 0;
1169-
if (minVersion < leaderVersion) {
1170-
logger.info("--> fixLag publishing a value to fix lag, leaderVersion={}, minVersion={}", leaderVersion, minVersion);
1171-
onNode(leader.getLocalNode(), () -> {
1172-
synchronized (leader.coordinator.mutex) {
1173-
leader.submitValue(randomLong());
1174-
}
1175-
}).run();
1176-
1177-
runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY
1178-
// may need to bump terms too
1179-
+ DEFAULT_ELECTION_DELAY,
1180-
"re-stabilising after lag-fixing publication");
1181-
1182-
if (clusterNodes.stream().anyMatch(n -> n.getClusterStateApplyResponse().equals(ClusterStateApplyResponse.HANG))) {
1183-
runFor(defaultMillis(PUBLISH_TIMEOUT_SETTING), "allowing lag-fixing publication to time out");
1184-
}
1185-
} else {
1186-
logger.info("--> fixLag found no lag, leader={}, leaderVersion={}, minVersion={}", leader, leaderVersion, minVersion);
1187-
}
1188-
}
1189-
11901160
void runFor(long runDurationMillis, String description) {
11911161
final long endTime = deterministicTaskQueue.getCurrentTimeMillis() + runDurationMillis;
11921162
logger.info("--> runFor({}ms) running until [{}ms]: {}", runDurationMillis, endTime, description);

0 commit comments

Comments
 (0)