3737import org .elasticsearch .common .bytes .BytesArray ;
3838import org .elasticsearch .common .bytes .BytesReference ;
3939import org .elasticsearch .common .bytes .CompositeBytesReference ;
40+ import org .elasticsearch .common .collect .MapBuilder ;
4041import org .elasticsearch .common .component .AbstractLifecycleComponent ;
4142import org .elasticsearch .common .component .Lifecycle ;
4243import org .elasticsearch .common .compress .Compressor ;
9798import java .util .TreeSet ;
9899import java .util .concurrent .ConcurrentHashMap ;
99100import java .util .concurrent .ConcurrentMap ;
101+ import java .util .concurrent .CopyOnWriteArrayList ;
100102import java .util .concurrent .CountDownLatch ;
101103import java .util .concurrent .TimeUnit ;
102104import java .util .concurrent .atomic .AtomicBoolean ;
103- import java .util .concurrent .atomic .AtomicLong ;
104105import java .util .concurrent .atomic .AtomicReference ;
105106import java .util .concurrent .locks .ReadWriteLock ;
106107import java .util .concurrent .locks .ReentrantReadWriteLock ;
@@ -201,7 +202,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
201202 protected final NetworkService networkService ;
202203 protected final Set <ProfileSettings > profileSettings ;
203204
204- private volatile TransportService transportService ;
205+ private final DelegatingTransportConnectionListener transportListener = new DelegatingTransportConnectionListener () ;
205206
206207 private final ConcurrentMap <String , BoundTransportAddress > profileBoundAddresses = newConcurrentMap ();
207208 // node id to actual channel
@@ -221,12 +222,13 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
221222 protected final ConnectionProfile defaultConnectionProfile ;
222223
223224 private final ConcurrentMap <Long , HandshakeResponseHandler > pendingHandshakes = new ConcurrentHashMap <>();
224- private final AtomicLong requestIdGenerator = new AtomicLong ();
225225 private final CounterMetric numHandshakes = new CounterMetric ();
226226 private static final String HANDSHAKE_ACTION_NAME = "internal:tcp/handshake" ;
227227
228228 private final MeanMetric readBytesMetric = new MeanMetric ();
229229 private final MeanMetric transmittedBytesMetric = new MeanMetric ();
230+ private volatile Map <String , RequestHandlerRegistry > requestHandlers = Collections .emptyMap ();
231+ private final ResponseHandlers responseHandlers = new ResponseHandlers ();
230232
231233 public TcpTransport (String transportName , Settings settings , ThreadPool threadPool , BigArrays bigArrays ,
232234 CircuitBreakerService circuitBreakerService , NamedWriteableRegistry namedWriteableRegistry ,
@@ -283,18 +285,28 @@ protected void doStart() {
283285 }
284286 }
285287
288+ @ Override
289+ public void addConnectionListener (TransportConnectionListener listener ) {
290+ transportListener .listeners .add (listener );
291+ }
292+
293+ @ Override
294+ public boolean removeConnectionListener (TransportConnectionListener listener ) {
295+ return transportListener .listeners .remove (listener );
296+ }
297+
286298 @ Override
287299 public CircuitBreaker getInFlightRequestBreaker () {
288300 // We always obtain a fresh breaker to reflect changes to the breaker configuration.
289301 return circuitBreakerService .getBreaker (CircuitBreaker .IN_FLIGHT_REQUESTS );
290302 }
291303
292304 @ Override
293- public void setTransportService ( TransportService service ) {
294- if (service . getRequestHandler ( HANDSHAKE_ACTION_NAME ) != null ) {
295- throw new IllegalStateException ( HANDSHAKE_ACTION_NAME + " is a reserved request handler and must not be registered" );
305+ public synchronized < Request extends TransportRequest > void registerRequestHandler ( RequestHandlerRegistry < Request > reg ) {
306+ if (requestHandlers . containsKey ( reg . getAction ()) ) {
307+ throw new IllegalArgumentException ( "transport handlers for action " + reg . getAction () + " is already registered" );
296308 }
297- this . transportService = service ;
309+ requestHandlers = MapBuilder . newMapBuilder ( requestHandlers ). put ( reg . getAction (), reg ). immutableMap () ;
298310 }
299311
300312 private static class HandshakeResponseHandler implements TransportResponseHandler <VersionHandshakeResponse > {
@@ -479,7 +491,7 @@ public void close() {
479491 boolean block = lifecycle .stopped () && Transports .isTransportThread (Thread .currentThread ()) == false ;
480492 TcpChannel .closeChannels (channels , block );
481493 } finally {
482- transportService .onConnectionClosed (this );
494+ transportListener .onConnectionClosed (this );
483495 }
484496 }
485497 }
@@ -535,7 +547,7 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil
535547 logger .debug ("connected to node [{}]" , node );
536548 }
537549 try {
538- transportService .onNodeConnected (node );
550+ transportListener .onNodeConnected (node );
539551 } finally {
540552 if (nodeChannels .isClosed ()) {
541553 // we got closed concurrently due to a disconnect or some other event on the channel.
@@ -547,7 +559,7 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil
547559 // try to remove it first either way one of the two wins even if the callback has run before we even added the
548560 // tuple to the map since in that case we remove it here again
549561 if (connectedNodes .remove (node , nodeChannels )) {
550- transportService .onNodeDisconnected (node );
562+ transportListener .onNodeDisconnected (node );
551563 }
552564 throw new NodeNotConnectedException (node , "connection concurrently closed" );
553565 }
@@ -649,7 +661,7 @@ public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile c
649661 // At this point we should construct the connection, notify the transport service, and attach close listeners to the
650662 // underlying channels.
651663 nodeChannels = new NodeChannels (node , channels , connectionProfile , version );
652- transportService .onConnectionOpened (nodeChannels );
664+ transportListener .onConnectionOpened (nodeChannels );
653665 final NodeChannels finalNodeChannels = nodeChannels ;
654666 final AtomicBoolean runOnce = new AtomicBoolean (false );
655667 Consumer <TcpChannel > onClose = c -> {
@@ -692,7 +704,7 @@ private void disconnectFromNodeCloseAndNotify(DiscoveryNode node, NodeChannels n
692704 if (closeLock .readLock ().tryLock ()) {
693705 try {
694706 if (connectedNodes .remove (node , nodeChannels )) {
695- transportService .onNodeDisconnected (node );
707+ transportListener .onNodeDisconnected (node );
696708 }
697709 } finally {
698710 closeLock .readLock ().unlock ();
@@ -719,7 +731,7 @@ public void disconnectFromNode(DiscoveryNode node) {
719731 } finally {
720732 closeLock .readLock ().unlock ();
721733 if (nodeChannels != null ) { // if we found it and removed it we close and notify
722- IOUtils .closeWhileHandlingException (nodeChannels , () -> transportService .onNodeDisconnected (node ));
734+ IOUtils .closeWhileHandlingException (nodeChannels , () -> transportListener .onNodeDisconnected (node ));
723735 }
724736 }
725737 }
@@ -976,7 +988,7 @@ protected final void doStop() {
976988 Map .Entry <DiscoveryNode , NodeChannels > next = iterator .next ();
977989 try {
978990 IOUtils .closeWhileHandlingException (next .getValue ());
979- transportService .onNodeDisconnected (next .getKey ());
991+ transportListener .onNodeDisconnected (next .getKey ());
980992 } finally {
981993 iterator .remove ();
982994 }
@@ -1120,7 +1132,7 @@ private void sendRequestToChannel(final DiscoveryNode node, final TcpChannel cha
11201132 final TransportRequestOptions finalOptions = options ;
11211133 // this might be called in a different thread
11221134 SendListener onRequestSent = new SendListener (channel , stream ,
1123- () -> transportService .onRequestSent (node , requestId , action , request , finalOptions ), message .length ());
1135+ () -> transportListener .onRequestSent (node , requestId , action , request , finalOptions ), message .length ());
11241136 internalSendMessage (channel , message , onRequestSent );
11251137 addedReleaseListener = true ;
11261138 } finally {
@@ -1174,7 +1186,7 @@ public void sendErrorResponse(
11741186 final BytesReference header = buildHeader (requestId , status , nodeVersion , bytes .length ());
11751187 CompositeBytesReference message = new CompositeBytesReference (header , bytes );
11761188 SendListener onResponseSent = new SendListener (channel , null ,
1177- () -> transportService .onResponseSent (requestId , action , error ), message .length ());
1189+ () -> transportListener .onResponseSent (requestId , action , error ), message .length ());
11781190 internalSendMessage (channel , message , onResponseSent );
11791191 }
11801192 }
@@ -1223,7 +1235,7 @@ private void sendResponse(
12231235 final TransportResponseOptions finalOptions = options ;
12241236 // this might be called in a different thread
12251237 SendListener listener = new SendListener (channel , stream ,
1226- () -> transportService .onResponseSent (requestId , action , response , finalOptions ), message .length ());
1238+ () -> transportListener .onResponseSent (requestId , action , response , finalOptions ), message .length ());
12271239 internalSendMessage (channel , message , listener );
12281240 addedReleaseListener = true ;
12291241 } finally {
@@ -1418,7 +1430,7 @@ public final void messageReceived(BytesReference reference, TcpChannel channel,
14181430 if (isHandshake ) {
14191431 handler = pendingHandshakes .remove (requestId );
14201432 } else {
1421- TransportResponseHandler theHandler = transportService .onResponseReceived (requestId );
1433+ TransportResponseHandler theHandler = responseHandlers .onResponseReceived (requestId , transportListener );
14221434 if (theHandler == null && TransportStatus .isError (status )) {
14231435 handler = pendingHandshakes .remove (requestId );
14241436 } else {
@@ -1525,15 +1537,15 @@ protected String handleRequest(TcpChannel channel, String profileName, final Str
15251537 features = Collections .emptySet ();
15261538 }
15271539 final String action = stream .readString ();
1528- transportService .onRequestReceived (requestId , action );
1540+ transportListener .onRequestReceived (requestId , action );
15291541 TransportChannel transportChannel = null ;
15301542 try {
15311543 if (TransportStatus .isHandshake (status )) {
15321544 final VersionHandshakeResponse response = new VersionHandshakeResponse (getCurrentVersion ());
15331545 sendResponse (version , features , channel , response , requestId , HANDSHAKE_ACTION_NAME , TransportResponseOptions .EMPTY ,
15341546 TransportStatus .setHandshake ((byte ) 0 ));
15351547 } else {
1536- final RequestHandlerRegistry reg = transportService . getRequestHandler (action );
1548+ final RequestHandlerRegistry reg = getRequestHandler (action );
15371549 if (reg == null ) {
15381550 throw new ActionNotFoundTransportException (action );
15391551 }
@@ -1640,7 +1652,7 @@ public void writeTo(StreamOutput out) throws IOException {
16401652 protected Version executeHandshake (DiscoveryNode node , TcpChannel channel , TimeValue timeout )
16411653 throws IOException , InterruptedException {
16421654 numHandshakes .inc ();
1643- final long requestId = newRequestId ();
1655+ final long requestId = responseHandlers . newRequestId ();
16441656 final HandshakeResponseHandler handler = new HandshakeResponseHandler (channel );
16451657 AtomicReference <Version > versionRef = handler .versionRef ;
16461658 AtomicReference <Exception > exceptionRef = handler .exceptionRef ;
@@ -1690,11 +1702,6 @@ final long getNumHandshakes() {
16901702 return numHandshakes .count (); // for testing
16911703 }
16921704
1693- @ Override
1694- public long newRequestId () {
1695- return requestIdGenerator .incrementAndGet ();
1696- }
1697-
16981705 /**
16991706 * Called once the channel is closed for instance due to a disconnect or a closed socket etc.
17001707 */
@@ -1838,4 +1845,82 @@ public ProfileSettings(Settings settings, String profileName) {
18381845 PUBLISH_PORT_PROFILE .getConcreteSettingForNamespace (profileName ).get (settings );
18391846 }
18401847 }
1848+
1849+ private static final class DelegatingTransportConnectionListener implements TransportConnectionListener {
1850+ private final List <TransportConnectionListener > listeners = new CopyOnWriteArrayList <>();
1851+
1852+ @ Override
1853+ public void onRequestReceived (long requestId , String action ) {
1854+ for (TransportConnectionListener listener : listeners ) {
1855+ listener .onRequestReceived (requestId , action );
1856+ }
1857+ }
1858+
1859+ @ Override
1860+ public void onResponseSent (long requestId , String action , TransportResponse response , TransportResponseOptions finalOptions ) {
1861+ for (TransportConnectionListener listener : listeners ) {
1862+ listener .onResponseSent (requestId , action , response , finalOptions );
1863+ }
1864+ }
1865+
1866+ @ Override
1867+ public void onResponseSent (long requestId , String action , Exception error ) {
1868+ for (TransportConnectionListener listener : listeners ) {
1869+ listener .onResponseSent (requestId , action , error );
1870+ }
1871+ }
1872+
1873+ @ Override
1874+ public void onRequestSent (DiscoveryNode node , long requestId , String action , TransportRequest request ,
1875+ TransportRequestOptions finalOptions ) {
1876+ for (TransportConnectionListener listener : listeners ) {
1877+ listener .onRequestSent (node , requestId , action , request , finalOptions );
1878+ }
1879+ }
1880+
1881+ @ Override
1882+ public void onNodeDisconnected (DiscoveryNode key ) {
1883+ for (TransportConnectionListener listener : listeners ) {
1884+ listener .onNodeDisconnected (key );
1885+ }
1886+ }
1887+
1888+ @ Override
1889+ public void onConnectionOpened (Connection nodeChannels ) {
1890+ for (TransportConnectionListener listener : listeners ) {
1891+ listener .onConnectionOpened (nodeChannels );
1892+ }
1893+ }
1894+
1895+ @ Override
1896+ public void onNodeConnected (DiscoveryNode node ) {
1897+ for (TransportConnectionListener listener : listeners ) {
1898+ listener .onNodeConnected (node );
1899+ }
1900+ }
1901+
1902+ @ Override
1903+ public void onConnectionClosed (Connection nodeChannels ) {
1904+ for (TransportConnectionListener listener : listeners ) {
1905+ listener .onConnectionClosed (nodeChannels );
1906+ }
1907+ }
1908+
1909+ @ Override
1910+ public void onResponseReceived (long requestId , ResponseContext holder ) {
1911+ for (TransportConnectionListener listener : listeners ) {
1912+ listener .onResponseReceived (requestId , holder );
1913+ }
1914+ }
1915+ }
1916+
1917+ @ Override
1918+ public final ResponseHandlers getResponseHandlers () {
1919+ return responseHandlers ;
1920+ }
1921+
1922+ @ Override
1923+ public final RequestHandlerRegistry getRequestHandler (String action ) {
1924+ return requestHandlers .get (action );
1925+ }
18411926}
0 commit comments