Skip to content

Commit 1d47c95

Browse files
authored
Fix CoordinatorTests (#34002)
Today the CoordinatorTests are not very reliable if two elections are scheduled concurrently. Although we expect occasional failures due to this, in fact the failures are much more common than expected due to a handful of issues. This PR fixes these issues.
1 parent 02b483c commit 1d47c95

File tree

13 files changed

+121
-13
lines changed

13 files changed

+121
-13
lines changed

buildSrc/src/main/resources/checkstyle_suppressions.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,6 @@
410410
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]SearchRequestBuilderTests.java" checks="LineLength" />
411411
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]WaitActiveShardCountIT.java" checks="LineLength" />
412412
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]broadcast[/\\]node[/\\]TransportBroadcastByNodeActionTests.java" checks="LineLength" />
413-
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]master[/\\]TransportMasterNodeActionTests.java" checks="LineLength" />
414413
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]replication[/\\]BroadcastReplicationTests.java" checks="LineLength" />
415414
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]single[/\\]instance[/\\]TransportInstanceSingleOperationActionTests.java" checks="LineLength" />
416415
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]termvectors[/\\]AbstractTermVectorsTestCase.java" checks="LineLength" />

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1086,6 +1086,19 @@ public long globalCheckpoint() {
10861086
return globalCheckpoint;
10871087
}
10881088

1089+
@Override
1090+
public boolean equals(Object o) {
1091+
if (this == o) return true;
1092+
if (o == null || getClass() != o.getClass()) return false;
1093+
ReplicaResponse that = (ReplicaResponse) o;
1094+
return localCheckpoint == that.localCheckpoint &&
1095+
globalCheckpoint == that.globalCheckpoint;
1096+
}
1097+
1098+
@Override
1099+
public int hashCode() {
1100+
return Objects.hash(localCheckpoint, globalCheckpoint);
1101+
}
10891102
}
10901103

10911104
/**

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,9 @@ private void handleApplyCommit(ApplyCommitRequest applyCommitRequest) {
123123
}
124124

125125
PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) {
126+
assert publishRequest.getAcceptedState().nodes().getLocalNode().equals(getLocalNode()) :
127+
publishRequest.getAcceptedState().nodes().getLocalNode() + " != " + getLocalNode();
128+
126129
synchronized (mutex) {
127130
final DiscoveryNode sourceNode = publishRequest.getAcceptedState().nodes().getMasterNode();
128131
logger.trace("handlePublishRequest: handling [{}] from [{}]", publishRequest, sourceNode);
@@ -564,7 +567,9 @@ protected void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest app
564567
transportService.getThreadPool().schedule(publishTimeout, Names.GENERIC, new Runnable() {
565568
@Override
566569
public void run() {
567-
publication.onTimeout();
570+
synchronized (mutex) {
571+
publication.onTimeout();
572+
}
568573
}
569574

570575
@Override

server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,11 @@ private void handleWakeUp() {
256256
transportService.sendRequest(discoveryNode, FOLLOWER_CHECK_ACTION_NAME, request,
257257
TransportRequestOptions.builder().withTimeout(followerCheckTimeout).withType(Type.PING).build(),
258258
new TransportResponseHandler<Empty>() {
259+
@Override
260+
public Empty read(StreamInput in) {
261+
return Empty.INSTANCE;
262+
}
263+
259264
@Override
260265
public void handleResponse(Empty response) {
261266
if (running() == false) {

server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
import java.io.IOException;
4242
import java.util.HashMap;
43+
import java.util.LinkedHashMap;
4344
import java.util.List;
4445
import java.util.Map;
4546
import java.util.Optional;
@@ -263,7 +264,7 @@ public void close(Mode newMode) {
263264
assert closed == false : "CandidateJoinAccumulator closed";
264265
closed = true;
265266
if (newMode == Mode.LEADER) {
266-
final Map<JoinTaskExecutor.Task, ClusterStateTaskListener> pendingAsTasks = new HashMap<>();
267+
final Map<JoinTaskExecutor.Task, ClusterStateTaskListener> pendingAsTasks = new LinkedHashMap<>();
267268
joinRequestAccumulator.forEach((key, value) -> {
268269
final JoinTaskExecutor.Task task = new JoinTaskExecutor.Task(key, "elect leader");
269270
pendingAsTasks.put(task, new JoinTaskListener(task, value));

server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,12 @@ void handleWakeUp() {
167167
TransportRequestOptions.builder().withTimeout(leaderCheckTimeout).withType(Type.PING).build(),
168168

169169
new TransportResponseHandler<TransportResponse.Empty>() {
170+
171+
@Override
172+
public Empty read(StreamInput in) {
173+
return Empty.INSTANCE;
174+
}
175+
170176
@Override
171177
public void handleResponse(Empty response) {
172178
if (isClosed.get()) {

server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,15 @@
2626
import org.elasticsearch.common.Nullable;
2727
import org.elasticsearch.common.collect.Tuple;
2828
import org.elasticsearch.common.component.AbstractComponent;
29+
import org.elasticsearch.common.io.stream.StreamInput;
2930
import org.elasticsearch.common.lease.Releasable;
3031
import org.elasticsearch.common.settings.Settings;
3132
import org.elasticsearch.threadpool.ThreadPool.Names;
3233
import org.elasticsearch.transport.TransportException;
3334
import org.elasticsearch.transport.TransportResponseHandler;
3435
import org.elasticsearch.transport.TransportService;
3536

37+
import java.io.IOException;
3638
import java.util.Set;
3739
import java.util.concurrent.atomic.AtomicBoolean;
3840
import java.util.function.LongConsumer;
@@ -127,6 +129,11 @@ void start(final Iterable<DiscoveryNode> broadcastNodes) {
127129
logger.debug("{} requesting pre-votes from {}", this, broadcastNodes);
128130
broadcastNodes.forEach(n -> transportService.sendRequest(n, REQUEST_PRE_VOTE_ACTION_NAME, preVoteRequest,
129131
new TransportResponseHandler<PreVoteResponse>() {
132+
@Override
133+
public PreVoteResponse read(StreamInput in) throws IOException {
134+
return new PreVoteResponse(in);
135+
}
136+
130137
@Override
131138
public void handleResponse(PreVoteResponse response) {
132139
handlePreVoteResponse(response, n);

server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ public void start(Set<DiscoveryNode> faultyNodes) {
7070
}
7171

7272
public void onTimeout() {
73+
if (isCompleted) {
74+
return;
75+
}
76+
7377
assert timedOut == false;
7478
timedOut = true;
7579
if (applyCommitRequest.isPresent() == false) {

server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import org.elasticsearch.cluster.node.DiscoveryNode;
3939
import org.elasticsearch.cluster.node.DiscoveryNodes;
4040
import org.elasticsearch.cluster.service.ClusterService;
41+
import org.elasticsearch.common.io.stream.StreamInput;
42+
import org.elasticsearch.common.io.stream.StreamOutput;
4143
import org.elasticsearch.common.settings.Settings;
4244
import org.elasticsearch.common.unit.TimeValue;
4345
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
@@ -55,8 +57,10 @@
5557
import org.junit.Before;
5658
import org.junit.BeforeClass;
5759

60+
import java.io.IOException;
5861
import java.util.Collections;
5962
import java.util.HashSet;
63+
import java.util.Objects;
6064
import java.util.Set;
6165
import java.util.concurrent.ExecutionException;
6266
import java.util.concurrent.TimeUnit;
@@ -127,10 +131,38 @@ public ActionRequestValidationException validate() {
127131
}
128132
}
129133

130-
class Response extends ActionResponse {}
134+
class Response extends ActionResponse {
135+
private long identity = randomLong();
136+
137+
@Override
138+
public boolean equals(Object o) {
139+
if (this == o) return true;
140+
if (o == null || getClass() != o.getClass()) return false;
141+
Response response = (Response) o;
142+
return identity == response.identity;
143+
}
144+
145+
@Override
146+
public int hashCode() {
147+
return Objects.hash(identity);
148+
}
149+
150+
@Override
151+
public void writeTo(StreamOutput out) throws IOException {
152+
super.writeTo(out);
153+
out.writeLong(identity);
154+
}
155+
156+
@Override
157+
public void readFrom(StreamInput in) throws IOException {
158+
super.readFrom(in);
159+
identity = in.readLong();
160+
}
161+
}
131162

132163
class Action extends TransportMasterNodeAction<Request, Response> {
133-
Action(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) {
164+
Action(Settings settings, String actionName, TransportService transportService, ClusterService clusterService,
165+
ThreadPool threadPool) {
134166
super(settings, actionName, transportService, clusterService, threadPool,
135167
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY), Request::new);
136168
}
@@ -176,7 +208,7 @@ public void testLocalOperationWithoutBlocks() throws ExecutionException, Interru
176208

177209
new Action(Settings.EMPTY, "internal:testAction", transportService, clusterService, threadPool) {
178210
@Override
179-
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
211+
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) {
180212
if (masterOperationFailure) {
181213
listener.onFailure(exception);
182214
} else {

test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020

2121
import org.apache.logging.log4j.Logger;
2222
import org.apache.logging.log4j.message.ParameterizedMessage;
23+
import org.elasticsearch.cluster.ClusterModule;
2324
import org.elasticsearch.cluster.node.DiscoveryNode;
25+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2426
import org.elasticsearch.test.transport.MockTransport;
2527
import org.elasticsearch.transport.ConnectTransportException;
2628
import org.elasticsearch.transport.RequestHandlerRegistry;
@@ -29,8 +31,11 @@
2931
import org.elasticsearch.transport.TransportResponse;
3032
import org.elasticsearch.transport.TransportResponseOptions;
3133

34+
import java.io.IOException;
3235
import java.util.Optional;
3336

37+
import static org.elasticsearch.test.ESTestCase.copyWriteable;
38+
3439
public abstract class DisruptableMockTransport extends MockTransport {
3540
private final Logger logger;
3641

@@ -68,7 +73,6 @@ public String toString() {
6873
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode destination) {
6974

7075
assert destination.equals(getLocalNode()) == false : "non-local message from " + getLocalNode() + " to itself";
71-
super.onSendRequest(requestId, action, request, destination);
7276

7377
final String requestDescription = new ParameterizedMessage("[{}][{}] from {} to {}",
7478
action, requestId, getLocalNode(), destination).getFormattedMessage();
@@ -162,8 +166,15 @@ public String toString() {
162166
}
163167
};
164168

169+
final TransportRequest copiedRequest;
170+
try {
171+
copiedRequest = copyWriteable(request, writeableRegistry(), requestHandler::newRequest);
172+
} catch (IOException e) {
173+
throw new AssertionError("exception de/serializing request", e);
174+
}
175+
165176
try {
166-
requestHandler.processMessageReceived(request, transportChannel);
177+
requestHandler.processMessageReceived(copiedRequest, transportChannel);
167178
} catch (Exception e) {
168179
try {
169180
transportChannel.sendResponse(e);
@@ -181,6 +192,10 @@ public String toString() {
181192
});
182193
}
183194

195+
private NamedWriteableRegistry writeableRegistry() {
196+
return new NamedWriteableRegistry(ClusterModule.getNamedWriteables());
197+
}
198+
184199
public enum ConnectionStatus {
185200
CONNECTED,
186201
DISCONNECTED, // network requests to or from this node throw a ConnectTransportException

0 commit comments

Comments
 (0)