Skip to content

Commit 08447de

Browse files
Tim-Brooksjasontedor
authored andcommitted
Remove client connections from TcpTransport (#31886) (#32954)
This is related to #31835. This commit adds a connection manager that manages client connections to other nodes. This means that the TcpTransport no longer maintains a map of nodes that it is connected to.
1 parent ed70e2c commit 08447de

File tree

43 files changed

+1611
-1196
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1611
-1196
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -276,8 +276,12 @@ protected NettyTcpChannel bind(String name, InetSocketAddress address) {
276276
return esChannel;
277277
}
278278

279-
ScheduledPing getPing() {
280-
return scheduledPing;
279+
long successfulPingCount() {
280+
return successfulPings.count();
281+
}
282+
283+
long failedPingCount() {
284+
return failedPings.count();
281285
}
282286

283287
@Override

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,8 @@
3131
import org.elasticsearch.threadpool.TestThreadPool;
3232
import org.elasticsearch.threadpool.ThreadPool;
3333
import org.elasticsearch.transport.TcpTransport;
34-
import org.elasticsearch.transport.TransportChannel;
3534
import org.elasticsearch.transport.TransportException;
3635
import org.elasticsearch.transport.TransportRequest;
37-
import org.elasticsearch.transport.TransportRequestHandler;
3836
import org.elasticsearch.transport.TransportRequestOptions;
3937
import org.elasticsearch.transport.TransportResponse;
4038
import org.elasticsearch.transport.TransportResponseHandler;
@@ -48,6 +46,7 @@
4846
import static org.hamcrest.Matchers.greaterThan;
4947

5048
public class Netty4ScheduledPingTests extends ESTestCase {
49+
5150
public void testScheduledPing() throws Exception {
5251
ThreadPool threadPool = new TestThreadPool(getClass().getName());
5352

@@ -63,14 +62,14 @@ public void testScheduledPing() throws Exception {
6362
final Netty4Transport nettyA = new Netty4Transport(settings, threadPool, new NetworkService(Collections.emptyList()),
6463
BigArrays.NON_RECYCLING_INSTANCE, registry, circuitBreakerService);
6564
MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
66-
null);
65+
null);
6766
serviceA.start();
6867
serviceA.acceptIncomingRequests();
6968

7069
final Netty4Transport nettyB = new Netty4Transport(settings, threadPool, new NetworkService(Collections.emptyList()),
7170
BigArrays.NON_RECYCLING_INSTANCE, registry, circuitBreakerService);
7271
MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
73-
null);
72+
null);
7473

7574
serviceB.start();
7675
serviceB.acceptIncomingRequests();
@@ -82,22 +81,19 @@ public void testScheduledPing() throws Exception {
8281
serviceB.connectToNode(nodeA);
8382

8483
assertBusy(() -> {
85-
assertThat(nettyA.getPing().getSuccessfulPings(), greaterThan(100L));
86-
assertThat(nettyB.getPing().getSuccessfulPings(), greaterThan(100L));
84+
assertThat(nettyA.successfulPingCount(), greaterThan(100L));
85+
assertThat(nettyB.successfulPingCount(), greaterThan(100L));
8786
});
88-
assertThat(nettyA.getPing().getFailedPings(), equalTo(0L));
89-
assertThat(nettyB.getPing().getFailedPings(), equalTo(0L));
87+
assertThat(nettyA.failedPingCount(), equalTo(0L));
88+
assertThat(nettyB.failedPingCount(), equalTo(0L));
9089

9190
serviceA.registerRequestHandler("internal:sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
92-
new TransportRequestHandler<TransportRequest.Empty>() {
93-
@Override
94-
public void messageReceived(TransportRequest.Empty request, TransportChannel channel) {
95-
try {
96-
channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.EMPTY);
97-
} catch (IOException e) {
98-
logger.error("Unexpected failure", e);
99-
fail(e.getMessage());
100-
}
91+
(request, channel) -> {
92+
try {
93+
channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.EMPTY);
94+
} catch (IOException e) {
95+
logger.error("Unexpected failure", e);
96+
fail(e.getMessage());
10197
}
10298
});
10399

@@ -129,11 +125,11 @@ public void handleException(TransportException exp) {
129125
}
130126

131127
assertBusy(() -> {
132-
assertThat(nettyA.getPing().getSuccessfulPings(), greaterThan(200L));
133-
assertThat(nettyB.getPing().getSuccessfulPings(), greaterThan(200L));
128+
assertThat(nettyA.successfulPingCount(), greaterThan(200L));
129+
assertThat(nettyB.successfulPingCount(), greaterThan(200L));
134130
});
135-
assertThat(nettyA.getPing().getFailedPings(), equalTo(0L));
136-
assertThat(nettyB.getPing().getFailedPings(), equalTo(0L));
131+
assertThat(nettyA.failedPingCount(), equalTo(0L));
132+
assertThat(nettyB.failedPingCount(), equalTo(0L));
137133

138134
Releasables.close(serviceA, serviceB);
139135
terminate(threadPool);

server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public void onFailure(Exception e) {
101101
}
102102

103103
@Override
104-
protected void doRun() throws Exception {
104+
protected void doRun() {
105105
try (Releasable ignored = nodeLocks.acquire(node)) {
106106
validateAndConnectIfNeeded(node);
107107
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.transport;
21+
22+
import org.elasticsearch.action.ActionListener;
23+
import org.elasticsearch.common.concurrent.CompletableContext;
24+
25+
26+
/**
27+
* Abstract Transport.Connection that provides common close logic.
28+
*/
29+
public abstract class CloseableConnection implements Transport.Connection {
30+
31+
private final CompletableContext<Void> closeContext = new CompletableContext<>();
32+
33+
@Override
34+
public void addCloseListener(ActionListener<Void> listener) {
35+
closeContext.addListener(ActionListener.toBiConsumer(listener));
36+
}
37+
38+
@Override
39+
public boolean isClosed() {
40+
return closeContext.isDone();
41+
}
42+
43+
@Override
44+
public void close() {
45+
// This method is safe to call multiple times as the close context will provide concurrency
46+
// protection and only be completed once. The attached listeners will only be notified once.
47+
closeContext.complete(null);
48+
}
49+
}

0 commit comments

Comments
 (0)