Skip to content

Commit defcc57

Browse files
committed
No need to fix up ClusterState, it should be done by the transport layer
1 parent 01645e0 commit defcc57

File tree

4 files changed

+19
-7
lines changed

4 files changed

+19
-7
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,8 @@ private void handleApplyCommit(ApplyCommitRequest applyCommitRequest) {
122122
}
123123

124124
PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) {
125-
// overwrite local node
126-
publishRequest = new PublishRequest(ClusterState.builder(publishRequest.getAcceptedState()).nodes(
127-
DiscoveryNodes.builder(publishRequest.getAcceptedState().nodes()).localNodeId(getLocalNode().getId()).build()).build());
125+
assert publishRequest.getAcceptedState().nodes().getLocalNode().equals(getLocalNode()) :
126+
publishRequest.getAcceptedState().nodes().getLocalNode() + " != " + getLocalNode();
128127

129128
synchronized (mutex) {
130129
final DiscoveryNode sourceNode = publishRequest.getAcceptedState().nodes().getMasterNode();

test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020

2121
import org.apache.logging.log4j.Logger;
2222
import org.apache.logging.log4j.message.ParameterizedMessage;
23+
import org.elasticsearch.cluster.ClusterModule;
2324
import org.elasticsearch.cluster.node.DiscoveryNode;
25+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2426
import org.elasticsearch.test.transport.MockTransport;
2527
import org.elasticsearch.transport.ConnectTransportException;
2628
import org.elasticsearch.transport.RequestHandlerRegistry;
@@ -29,8 +31,11 @@
2931
import org.elasticsearch.transport.TransportResponse;
3032
import org.elasticsearch.transport.TransportResponseOptions;
3133

34+
import java.io.IOException;
3235
import java.util.Optional;
3336

37+
import static org.elasticsearch.test.ESTestCase.copyWriteable;
38+
3439
public abstract class DisruptableMockTransport extends MockTransport {
3540
private final Logger logger;
3641

@@ -68,7 +73,6 @@ public String toString() {
6873
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode destination) {
6974

7075
assert destination.equals(getLocalNode()) == false : "non-local message from " + getLocalNode() + " to itself";
71-
super.onSendRequest(requestId, action, request, destination);
7276

7377
final String requestDescription = new ParameterizedMessage("[{}][{}] from {} to {}",
7478
action, requestId, getLocalNode(), destination).getFormattedMessage();
@@ -162,8 +166,15 @@ public String toString() {
162166
}
163167
};
164168

169+
final TransportRequest copiedRequest;
170+
try {
171+
copiedRequest = copyWriteable(request, writeableRegistry(), requestHandler::newRequest);
172+
} catch (IOException e) {
173+
throw new AssertionError("exception de/serializing request", e);
174+
}
175+
165176
try {
166-
requestHandler.processMessageReceived(request, transportChannel);
177+
requestHandler.processMessageReceived(copiedRequest, transportChannel);
167178
} catch (Exception e) {
168179
try {
169180
transportChannel.sendResponse(e);
@@ -181,6 +192,10 @@ public String toString() {
181192
});
182193
}
183194

195+
private NamedWriteableRegistry writeableRegistry() {
196+
return new NamedWriteableRegistry(ClusterModule.getNamedWriteables());
197+
}
198+
184199
public enum ConnectionStatus {
185200
CONNECTED,
186201
DISCONNECTED, // network requests to or from this node throw a ConnectTransportException

test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,6 @@ public void clear() {
111111

112112
@Override
113113
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
114-
super.onSendRequest(requestId, action, request, node);
115114
capturedRequests.add(new CapturingTransport.CapturedRequest(node, requestId, action, request));
116115
}
117116

test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,6 @@ public void close() {
181181
}
182182

183183
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
184-
185184
}
186185

187186
protected boolean nodeConnected(DiscoveryNode discoveryNode) {

0 commit comments

Comments
 (0)