diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml
index fc22e2172a000..99c2ee29f6063 100644
--- a/buildSrc/src/main/resources/checkstyle_suppressions.xml
+++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml
@@ -410,7 +410,6 @@
-
diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
index dbdd5acae1fc6..e5a23e03ce801 100644
--- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
+++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
@@ -1086,6 +1086,19 @@ public long globalCheckpoint() {
return globalCheckpoint;
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ReplicaResponse that = (ReplicaResponse) o;
+ return localCheckpoint == that.localCheckpoint &&
+ globalCheckpoint == that.globalCheckpoint;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(localCheckpoint, globalCheckpoint);
+ }
}
/**
diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java
index 2b112102a562d..73d0906330971 100644
--- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java
+++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java
@@ -122,6 +122,9 @@ private void handleApplyCommit(ApplyCommitRequest applyCommitRequest) {
}
PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) {
+ assert publishRequest.getAcceptedState().nodes().getLocalNode().equals(getLocalNode()) :
+ publishRequest.getAcceptedState().nodes().getLocalNode() + " != " + getLocalNode();
+
synchronized (mutex) {
final DiscoveryNode sourceNode = publishRequest.getAcceptedState().nodes().getMasterNode();
logger.trace("handlePublishRequest: handling [{}] from [{}]", publishRequest, sourceNode);
@@ -560,7 +563,9 @@ protected void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest app
transportService.getThreadPool().schedule(publishTimeout, Names.GENERIC, new Runnable() {
@Override
public void run() {
- publication.onTimeout();
+ synchronized (mutex) {
+ publication.onTimeout();
+ }
}
@Override
diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java
index 1407606d16892..4300787d39699 100644
--- a/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java
+++ b/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java
@@ -256,6 +256,11 @@ private void handleWakeUp() {
transportService.sendRequest(discoveryNode, FOLLOWER_CHECK_ACTION_NAME, request,
TransportRequestOptions.builder().withTimeout(followerCheckTimeout).withType(Type.PING).build(),
new TransportResponseHandler() {
+ @Override
+ public Empty read(StreamInput in) {
+ return Empty.INSTANCE;
+ }
+
@Override
public void handleResponse(Empty response) {
if (running() == false) {
diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java
index bc01612b899bc..bfdc9eedaf488 100644
--- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java
+++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java
@@ -40,6 +40,7 @@
import java.io.IOException;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -251,7 +252,7 @@ public void close(Mode newMode) {
assert closed == false : "CandidateJoinAccumulator closed";
closed = true;
if (newMode == Mode.LEADER) {
- final Map pendingAsTasks = new HashMap<>();
+ final Map pendingAsTasks = new LinkedHashMap<>();
joinRequestAccumulator.forEach((key, value) -> {
final JoinTaskExecutor.Task task = new JoinTaskExecutor.Task(key, "elect leader");
pendingAsTasks.put(task, new JoinTaskListener(task, value));
diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java
index 586a151b9e43d..2dd98643b2dfc 100644
--- a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java
+++ b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java
@@ -167,6 +167,12 @@ void handleWakeUp() {
TransportRequestOptions.builder().withTimeout(leaderCheckTimeout).withType(Type.PING).build(),
new TransportResponseHandler() {
+
+ @Override
+ public Empty read(StreamInput in) {
+ return Empty.INSTANCE;
+ }
+
@Override
public void handleResponse(Empty response) {
if (isClosed.get()) {
diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java
index b9aed719af404..c3417c17f8f53 100644
--- a/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java
+++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java
@@ -26,6 +26,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
+import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool.Names;
@@ -33,6 +34,7 @@
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
+import java.io.IOException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongConsumer;
@@ -127,6 +129,11 @@ void start(final Iterable broadcastNodes) {
logger.debug("{} requesting pre-votes from {}", this, broadcastNodes);
broadcastNodes.forEach(n -> transportService.sendRequest(n, REQUEST_PRE_VOTE_ACTION_NAME, preVoteRequest,
new TransportResponseHandler() {
+ @Override
+ public PreVoteResponse read(StreamInput in) throws IOException {
+ return new PreVoteResponse(in);
+ }
+
@Override
public void handleResponse(PreVoteResponse response) {
handlePreVoteResponse(response, n);
diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java
index f95bfd223d5c6..d994da2dbe0b9 100644
--- a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java
+++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java
@@ -70,6 +70,10 @@ public void start(Set faultyNodes) {
}
public void onTimeout() {
+ if (isCompleted) {
+ return;
+ }
+
assert timedOut == false;
timedOut = true;
if (applyCommitRequest.isPresent() == false) {
diff --git a/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java b/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java
index f983efde72e4a..f711fd20633df 100644
--- a/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java
+++ b/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java
@@ -38,6 +38,8 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
@@ -55,8 +57,10 @@
import org.junit.Before;
import org.junit.BeforeClass;
+import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -127,10 +131,38 @@ public ActionRequestValidationException validate() {
}
}
- class Response extends ActionResponse {}
+ class Response extends ActionResponse {
+ private long identity = randomLong();
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Response response = (Response) o;
+ return identity == response.identity;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(identity);
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ super.writeTo(out);
+ out.writeLong(identity);
+ }
+
+ @Override
+ public void readFrom(StreamInput in) throws IOException {
+ super.readFrom(in);
+ identity = in.readLong();
+ }
+ }
class Action extends TransportMasterNodeAction {
- Action(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) {
+ Action(Settings settings, String actionName, TransportService transportService, ClusterService clusterService,
+ ThreadPool threadPool) {
super(settings, actionName, transportService, clusterService, threadPool,
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY), Request::new);
}
@@ -176,7 +208,7 @@ public void testLocalOperationWithoutBlocks() throws ExecutionException, Interru
new Action(Settings.EMPTY, "internal:testAction", transportService, clusterService, threadPool) {
@Override
- protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener) throws Exception {
+ protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener) {
if (masterOperationFailure) {
listener.onFailure(exception);
} else {
diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java
index ccf4c34d7c299..c6139f4e73329 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java
@@ -20,7 +20,9 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.test.transport.MockTransport;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.RequestHandlerRegistry;
@@ -29,8 +31,11 @@
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseOptions;
+import java.io.IOException;
import java.util.Optional;
+import static org.elasticsearch.test.ESTestCase.copyWriteable;
+
public abstract class DisruptableMockTransport extends MockTransport {
private final Logger logger;
@@ -68,7 +73,6 @@ public String toString() {
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode destination) {
assert destination.equals(getLocalNode()) == false : "non-local message from " + getLocalNode() + " to itself";
- super.onSendRequest(requestId, action, request, destination);
final String requestDescription = new ParameterizedMessage("[{}][{}] from {} to {}",
action, requestId, getLocalNode(), destination).getFormattedMessage();
@@ -162,8 +166,15 @@ public String toString() {
}
};
+ final TransportRequest copiedRequest;
+ try {
+ copiedRequest = copyWriteable(request, writeableRegistry(), requestHandler::newRequest);
+ } catch (IOException e) {
+ throw new AssertionError("exception de/serializing request", e);
+ }
+
try {
- requestHandler.processMessageReceived(request, transportChannel);
+ requestHandler.processMessageReceived(copiedRequest, transportChannel);
} catch (Exception e) {
try {
transportChannel.sendResponse(e);
@@ -181,6 +192,10 @@ public String toString() {
});
}
+ private NamedWriteableRegistry writeableRegistry() {
+ return new NamedWriteableRegistry(ClusterModule.getNamedWriteables());
+ }
+
public enum ConnectionStatus {
CONNECTED,
DISCONNECTED, // network requests to or from this node throw a ConnectTransportException
diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java
index c827f9b83b43f..de2f41a5b3c3e 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java
@@ -111,7 +111,6 @@ public void clear() {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
- super.onSendRequest(requestId, action, request, node);
capturedRequests.add(new CapturingTransport.CapturedRequest(node, requestId, action, request));
}
diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java
index 1fb942f857ace..d642798d688ae 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java
@@ -19,7 +19,6 @@
package org.elasticsearch.test.transport;
-import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
@@ -91,7 +90,14 @@ public TransportService createTransportService(Settings settings, ThreadPool thr
public void handleResponse(final long requestId, final TransportResponse response) {
final TransportResponseHandler transportResponseHandler = responseHandlers.onResponseReceived(requestId, listener);
if (transportResponseHandler != null) {
- transportResponseHandler.handleResponse(response);
+ final TransportResponse deliveredResponse;
+ try (BytesStreamOutput output = new BytesStreamOutput()) {
+ response.writeTo(output);
+ deliveredResponse = transportResponseHandler.read(output.bytes().streamInput());
+ } catch (IOException | UnsupportedOperationException e) {
+ throw new AssertionError("failed to serialize/deserialize response " + response, e);
+ }
+ transportResponseHandler.handleResponse(deliveredResponse);
}
}
@@ -126,7 +132,7 @@ public void handleRemoteError(final long requestId, final Throwable t) {
output.writeException(t);
remoteException = new RemoteTransportException("remote failure", output.bytes().streamInput().readException());
} catch (IOException ioException) {
- throw new ElasticsearchException("failed to serialize/deserialize supplied exception " + t, ioException);
+ throw new AssertionError("failed to serialize/deserialize supplied exception " + t, ioException);
}
}
this.handleError(requestId, remoteException);
@@ -181,7 +187,6 @@ public void close() {
}
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
-
}
protected boolean nodeConnected(DiscoveryNode discoveryNode) {
diff --git a/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java b/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java
index 3916af6c1b9c3..e49b7d06524ca 100644
--- a/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java
+++ b/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java
@@ -23,6 +23,7 @@
import org.elasticsearch.cluster.coordination.DeterministicTaskQueue;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.Tuple;
+import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESTestCase;
@@ -33,6 +34,7 @@
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponse;
+import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.junit.Before;
@@ -191,6 +193,11 @@ private TransportRequestHandler requestHandlerCaptures(C
private TransportResponseHandler responseHandlerShouldNotBeCalled() {
return new TransportResponseHandler() {
+ @Override
+ public TransportResponse read(StreamInput in) {
+ throw new AssertionError("should not be called");
+ }
+
@Override
public void handleResponse(TransportResponse response) {
throw new AssertionError("should not be called");
@@ -210,6 +217,11 @@ public String executor() {
private TransportResponseHandler responseHandlerShouldBeCalledNormally(Runnable onCalled) {
return new TransportResponseHandler() {
+ @Override
+ public TransportResponse read(StreamInput in) {
+ return Empty.INSTANCE;
+ }
+
@Override
public void handleResponse(TransportResponse response) {
onCalled.run();
@@ -229,6 +241,11 @@ public String executor() {
private TransportResponseHandler responseHandlerShouldBeCalledExceptionally(Consumer onCalled) {
return new TransportResponseHandler() {
+ @Override
+ public TransportResponse read(StreamInput in) {
+ throw new AssertionError("should not be called");
+ }
+
@Override
public void handleResponse(TransportResponse response) {
throw new AssertionError("should not be called");