Skip to content

Commit de92d2e

Browse files
authored
Move connection listener to ConnectionManager (#32956)
This is a followup to #31886. After that commit the TransportConnectionListener had to be propogated to both the Transport and the ConnectionManager. This commit moves that listener to completely live in the ConnectionManager. The request and response related methods are moved to a TransportMessageListener. That listener continues to live in the Transport class.
1 parent f82bb64 commit de92d2e

File tree

16 files changed

+167
-189
lines changed

16 files changed

+167
-189
lines changed

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.elasticsearch.Version;
2323
import org.elasticsearch.cluster.node.DiscoveryNode;
2424
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
25-
import org.elasticsearch.common.network.CloseableChannel;
2625
import org.elasticsearch.common.network.NetworkService;
2726
import org.elasticsearch.common.settings.ClusterSettings;
2827
import org.elasticsearch.common.settings.Settings;
@@ -87,13 +86,6 @@ protected MockTransportService build(Settings settings, Version version, Cluster
8786
return transportService;
8887
}
8988

90-
@Override
91-
protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException {
92-
final Netty4Transport t = (Netty4Transport) transport;
93-
final TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) connection;
94-
CloseableChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true);
95-
}
96-
9789
public void testConnectException() throws UnknownHostException {
9890
try {
9991
serviceA.connectToNode(new DiscoveryNode("C", new TransportAddress(InetAddress.getByName("localhost"), 9876),

plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.elasticsearch.Version;
2323
import org.elasticsearch.cluster.node.DiscoveryNode;
2424
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
25-
import org.elasticsearch.common.network.CloseableChannel;
2625
import org.elasticsearch.common.network.NetworkService;
2726
import org.elasticsearch.common.settings.ClusterSettings;
2827
import org.elasticsearch.common.settings.Settings;
@@ -93,12 +92,6 @@ protected MockTransportService build(Settings settings, Version version, Cluster
9392
return transportService;
9493
}
9594

96-
@Override
97-
protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException {
98-
TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) connection;
99-
CloseableChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true);
100-
}
101-
10295
public void testConnectException() throws UnknownHostException {
10396
try {
10497
serviceA.connectToNode(new DiscoveryNode("C", new TransportAddress(InetAddress.getByName("localhost"), 9876),

server/src/main/java/org/elasticsearch/transport/ConnectionManager.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ public void removeListener(TransportConnectionListener listener) {
9191
}
9292

9393
public Transport.Connection openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
94-
return transport.openConnection(node, ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile));
94+
ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile);
95+
return internalOpenConnection(node, resolvedProfile);
9596
}
9697

9798
/**
@@ -115,7 +116,7 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil
115116
}
116117
boolean success = false;
117118
try {
118-
connection = transport.openConnection(node, resolvedProfile);
119+
connection = internalOpenConnection(node, resolvedProfile);
119120
connectionValidator.accept(connection, resolvedProfile);
120121
// we acquire a connection lock, so no way there is an existing connection
121122
connectedNodes.put(node, connection);
@@ -227,6 +228,19 @@ public void close() {
227228
}
228229
}
229230

231+
private Transport.Connection internalOpenConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
232+
Transport.Connection connection = transport.openConnection(node, connectionProfile);
233+
try {
234+
connectionListener.onConnectionOpened(connection);
235+
} finally {
236+
connection.addCloseListener(ActionListener.wrap(() -> connectionListener.onConnectionClosed(connection)));
237+
}
238+
if (connection.isClosed()) {
239+
throw new ConnectTransportException(node, "a channel closed while connecting");
240+
}
241+
return connection;
242+
}
243+
230244
private void ensureOpen() {
231245
if (lifecycle.started() == false) {
232246
throw new IllegalStateException("connection manager is closed");
@@ -289,6 +303,20 @@ public void onNodeConnected(DiscoveryNode node) {
289303
listener.onNodeConnected(node);
290304
}
291305
}
306+
307+
@Override
308+
public void onConnectionOpened(Transport.Connection connection) {
309+
for (TransportConnectionListener listener : listeners) {
310+
listener.onConnectionOpened(connection);
311+
}
312+
}
313+
314+
@Override
315+
public void onConnectionClosed(Transport.Connection connection) {
316+
for (TransportConnectionListener listener : listeners) {
317+
listener.onConnectionClosed(connection);
318+
}
319+
}
292320
}
293321

294322
static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {

server/src/main/java/org/elasticsearch/transport/TcpTransport.java

Lines changed: 18 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
184184
protected final NetworkService networkService;
185185
protected final Set<ProfileSettings> profileSettings;
186186

187-
private final DelegatingTransportConnectionListener transportListener = new DelegatingTransportConnectionListener();
187+
private final DelegatingTransportMessageListener messageListener = new DelegatingTransportMessageListener();
188188

189189
private final ConcurrentMap<String, BoundTransportAddress> profileBoundAddresses = newConcurrentMap();
190190
private final Map<String, List<TcpServerChannel>> serverChannels = newConcurrentMap();
@@ -248,14 +248,12 @@ public TcpTransport(String transportName, Settings settings, ThreadPool threadPo
248248
protected void doStart() {
249249
}
250250

251-
@Override
252-
public void addConnectionListener(TransportConnectionListener listener) {
253-
transportListener.listeners.add(listener);
251+
public void addMessageListener(TransportMessageListener listener) {
252+
messageListener.listeners.add(listener);
254253
}
255254

256-
@Override
257-
public boolean removeConnectionListener(TransportConnectionListener listener) {
258-
return transportListener.listeners.remove(listener);
255+
public boolean removeMessageListener(TransportMessageListener listener) {
256+
return messageListener.listeners.remove(listener);
259257
}
260258

261259
@Override
@@ -344,10 +342,6 @@ public TcpChannel channel(TransportRequestOptions.Type type) {
344342
return connectionTypeHandle.getChannel(channels);
345343
}
346344

347-
boolean allChannelsOpen() {
348-
return channels.stream().allMatch(TcpChannel::isOpen);
349-
}
350-
351345
@Override
352346
public boolean sendPing() {
353347
for (TcpChannel channel : channels) {
@@ -481,22 +475,13 @@ public NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connect
481475
// underlying channels.
482476
nodeChannels = new NodeChannels(node, channels, connectionProfile, version);
483477
final NodeChannels finalNodeChannels = nodeChannels;
484-
try {
485-
transportListener.onConnectionOpened(nodeChannels);
486-
} finally {
487-
nodeChannels.addCloseListener(ActionListener.wrap(() -> transportListener.onConnectionClosed(finalNodeChannels)));
488-
}
489478

490479
Consumer<TcpChannel> onClose = c -> {
491480
assert c.isOpen() == false : "channel is still open when onClose is called";
492481
finalNodeChannels.close();
493482
};
494483

495484
nodeChannels.channels.forEach(ch -> ch.addCloseListener(ActionListener.wrap(() -> onClose.accept(ch))));
496-
497-
if (nodeChannels.allChannelsOpen() == false) {
498-
throw new ConnectTransportException(node, "a channel closed while connecting");
499-
}
500485
success = true;
501486
return nodeChannels;
502487
} catch (ConnectTransportException e) {
@@ -907,7 +892,7 @@ private void sendRequestToChannel(final DiscoveryNode node, final TcpChannel cha
907892
final TransportRequestOptions finalOptions = options;
908893
// this might be called in a different thread
909894
SendListener onRequestSent = new SendListener(channel, stream,
910-
() -> transportListener.onRequestSent(node, requestId, action, request, finalOptions), message.length());
895+
() -> messageListener.onRequestSent(node, requestId, action, request, finalOptions), message.length());
911896
internalSendMessage(channel, message, onRequestSent);
912897
addedReleaseListener = true;
913898
} finally {
@@ -961,7 +946,7 @@ public void sendErrorResponse(
961946
final BytesReference header = buildHeader(requestId, status, nodeVersion, bytes.length());
962947
CompositeBytesReference message = new CompositeBytesReference(header, bytes);
963948
SendListener onResponseSent = new SendListener(channel, null,
964-
() -> transportListener.onResponseSent(requestId, action, error), message.length());
949+
() -> messageListener.onResponseSent(requestId, action, error), message.length());
965950
internalSendMessage(channel, message, onResponseSent);
966951
}
967952
}
@@ -1010,7 +995,7 @@ private void sendResponse(
1010995
final TransportResponseOptions finalOptions = options;
1011996
// this might be called in a different thread
1012997
SendListener listener = new SendListener(channel, stream,
1013-
() -> transportListener.onResponseSent(requestId, action, response, finalOptions), message.length());
998+
() -> messageListener.onResponseSent(requestId, action, response, finalOptions), message.length());
1014999
internalSendMessage(channel, message, listener);
10151000
addedReleaseListener = true;
10161001
} finally {
@@ -1266,7 +1251,7 @@ public final void messageReceived(BytesReference reference, TcpChannel channel)
12661251
if (isHandshake) {
12671252
handler = pendingHandshakes.remove(requestId);
12681253
} else {
1269-
TransportResponseHandler theHandler = responseHandlers.onResponseReceived(requestId, transportListener);
1254+
TransportResponseHandler theHandler = responseHandlers.onResponseReceived(requestId, messageListener);
12701255
if (theHandler == null && TransportStatus.isError(status)) {
12711256
handler = pendingHandshakes.remove(requestId);
12721257
} else {
@@ -1373,7 +1358,7 @@ protected String handleRequest(TcpChannel channel, String profileName, final Str
13731358
features = Collections.emptySet();
13741359
}
13751360
final String action = stream.readString();
1376-
transportListener.onRequestReceived(requestId, action);
1361+
messageListener.onRequestReceived(requestId, action);
13771362
TransportChannel transportChannel = null;
13781363
try {
13791364
if (TransportStatus.isHandshake(status)) {
@@ -1682,69 +1667,42 @@ public ProfileSettings(Settings settings, String profileName) {
16821667
}
16831668
}
16841669

1685-
private static final class DelegatingTransportConnectionListener implements TransportConnectionListener {
1686-
private final List<TransportConnectionListener> listeners = new CopyOnWriteArrayList<>();
1670+
private static final class DelegatingTransportMessageListener implements TransportMessageListener {
1671+
1672+
private final List<TransportMessageListener> listeners = new CopyOnWriteArrayList<>();
16871673

16881674
@Override
16891675
public void onRequestReceived(long requestId, String action) {
1690-
for (TransportConnectionListener listener : listeners) {
1676+
for (TransportMessageListener listener : listeners) {
16911677
listener.onRequestReceived(requestId, action);
16921678
}
16931679
}
16941680

16951681
@Override
16961682
public void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions finalOptions) {
1697-
for (TransportConnectionListener listener : listeners) {
1683+
for (TransportMessageListener listener : listeners) {
16981684
listener.onResponseSent(requestId, action, response, finalOptions);
16991685
}
17001686
}
17011687

17021688
@Override
17031689
public void onResponseSent(long requestId, String action, Exception error) {
1704-
for (TransportConnectionListener listener : listeners) {
1690+
for (TransportMessageListener listener : listeners) {
17051691
listener.onResponseSent(requestId, action, error);
17061692
}
17071693
}
17081694

17091695
@Override
17101696
public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request,
17111697
TransportRequestOptions finalOptions) {
1712-
for (TransportConnectionListener listener : listeners) {
1698+
for (TransportMessageListener listener : listeners) {
17131699
listener.onRequestSent(node, requestId, action, request, finalOptions);
17141700
}
17151701
}
17161702

1717-
@Override
1718-
public void onNodeDisconnected(DiscoveryNode key) {
1719-
for (TransportConnectionListener listener : listeners) {
1720-
listener.onNodeDisconnected(key);
1721-
}
1722-
}
1723-
1724-
@Override
1725-
public void onConnectionOpened(Connection nodeChannels) {
1726-
for (TransportConnectionListener listener : listeners) {
1727-
listener.onConnectionOpened(nodeChannels);
1728-
}
1729-
}
1730-
1731-
@Override
1732-
public void onNodeConnected(DiscoveryNode node) {
1733-
for (TransportConnectionListener listener : listeners) {
1734-
listener.onNodeConnected(node);
1735-
}
1736-
}
1737-
1738-
@Override
1739-
public void onConnectionClosed(Connection nodeChannels) {
1740-
for (TransportConnectionListener listener : listeners) {
1741-
listener.onConnectionClosed(nodeChannels);
1742-
}
1743-
}
1744-
17451703
@Override
17461704
public void onResponseReceived(long requestId, ResponseContext holder) {
1747-
for (TransportConnectionListener listener : listeners) {
1705+
for (TransportMessageListener listener : listeners) {
17481706
listener.onResponseReceived(requestId, holder);
17491707
}
17501708
}

server/src/main/java/org/elasticsearch/transport/Transport.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -56,18 +56,9 @@ public interface Transport extends LifecycleComponent {
5656
*/
5757
RequestHandlerRegistry getRequestHandler(String action);
5858

59-
/**
60-
* Adds a new event listener
61-
* @param listener the listener to add
62-
*/
63-
void addConnectionListener(TransportConnectionListener listener);
59+
void addMessageListener(TransportMessageListener listener);
6460

65-
/**
66-
* Removes an event listener
67-
* @param listener the listener to remove
68-
* @return <code>true</code> iff the listener was removed otherwise <code>false</code>
69-
*/
70-
boolean removeConnectionListener(TransportConnectionListener listener);
61+
boolean removeMessageListener(TransportMessageListener listener);
7162

7263
/**
7364
* The address the transport is bound on.
@@ -254,7 +245,7 @@ public List<ResponseContext> prune(Predicate<ResponseContext> predicate) {
254245
* sent request (before any processing or deserialization was done). Returns the appropriate response handler or null if not
255246
* found.
256247
*/
257-
public TransportResponseHandler onResponseReceived(final long requestId, TransportConnectionListener listener) {
248+
public TransportResponseHandler onResponseReceived(final long requestId, TransportMessageListener listener) {
258249
ResponseContext context = handlers.remove(requestId);
259250
listener.onResponseReceived(requestId, context);
260251
if (context == null) {

server/src/main/java/org/elasticsearch/transport/TransportConnectionListener.java

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -28,42 +28,6 @@
2828
*/
2929
public interface TransportConnectionListener {
3030

31-
/**
32-
* Called once a request is received
33-
* @param requestId the internal request ID
34-
* @param action the request action
35-
*
36-
*/
37-
default void onRequestReceived(long requestId, String action) {}
38-
39-
/**
40-
* Called for every action response sent after the response has been passed to the underlying network implementation.
41-
* @param requestId the request ID (unique per client)
42-
* @param action the request action
43-
* @param response the response send
44-
* @param finalOptions the response options
45-
*/
46-
default void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions finalOptions) {}
47-
48-
/***
49-
* Called for every failed action response after the response has been passed to the underlying network implementation.
50-
* @param requestId the request ID (unique per client)
51-
* @param action the request action
52-
* @param error the error sent back to the caller
53-
*/
54-
default void onResponseSent(long requestId, String action, Exception error) {}
55-
56-
/**
57-
* Called for every request sent to a server after the request has been passed to the underlying network implementation
58-
* @param node the node the request was sent to
59-
* @param requestId the internal request id
60-
* @param action the action name
61-
* @param request the actual request
62-
* @param finalOptions the request options
63-
*/
64-
default void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request,
65-
TransportRequestOptions finalOptions) {}
66-
6731
/**
6832
* Called once a connection was opened
6933
* @param connection the connection
@@ -76,13 +40,6 @@ default void onConnectionOpened(Transport.Connection connection) {}
7640
*/
7741
default void onConnectionClosed(Transport.Connection connection) {}
7842

79-
/**
80-
* Called for every response received
81-
* @param requestId the request id for this reponse
82-
* @param context the response context or null if the context was already processed ie. due to a timeout.
83-
*/
84-
default void onResponseReceived(long requestId, Transport.ResponseContext context) {}
85-
8643
/**
8744
* Called once a node connection is opened and registered.
8845
*/

0 commit comments

Comments
 (0)