Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ public void removeListener(TransportConnectionListener listener) {
}

public Transport.Connection openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
return transport.openConnection(node, ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile));
ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile);
return internalOpenConnection(node, resolvedProfile);
}

/**
Expand All @@ -115,7 +116,7 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil
}
boolean success = false;
try {
connection = transport.openConnection(node, resolvedProfile);
connection = internalOpenConnection(node, resolvedProfile);
connectionValidator.accept(connection, resolvedProfile);
// we acquire a connection lock, so no way there is an existing connection
connectedNodes.put(node, connection);
Expand Down Expand Up @@ -227,6 +228,16 @@ public void close() {
}
}

private Transport.Connection internalOpenConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
Transport.Connection connection = transport.openConnection(node, connectionProfile);
try {
connectionListener.onConnectionOpened(connection);
} finally {
connection.addCloseListener(ActionListener.wrap(() -> connectionListener.onConnectionClosed(connection)));
}
return connection;
}

private void ensureOpen() {
if (lifecycle.started() == false) {
throw new IllegalStateException("connection manager is closed");
Expand Down Expand Up @@ -289,6 +300,20 @@ public void onNodeConnected(DiscoveryNode node) {
listener.onNodeConnected(node);
}
}

@Override
public void onConnectionOpened(Transport.Connection connection) {
for (TransportConnectionListener listener : listeners) {
listener.onConnectionOpened(connection);
}
}

@Override
public void onConnectionClosed(Transport.Connection connection) {
for (TransportConnectionListener listener : listeners) {
listener.onConnectionClosed(connection);
}
}
}

static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
Expand Down
70 changes: 18 additions & 52 deletions server/src/main/java/org/elasticsearch/transport/TcpTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
protected final NetworkService networkService;
protected final Set<ProfileSettings> profileSettings;

private final DelegatingTransportConnectionListener transportListener = new DelegatingTransportConnectionListener();
private final DelegatingTransportMessageListener messageListener = new DelegatingTransportMessageListener();

private final ConcurrentMap<String, BoundTransportAddress> profileBoundAddresses = newConcurrentMap();
private final Map<String, List<TcpServerChannel>> serverChannels = newConcurrentMap();
Expand Down Expand Up @@ -248,14 +248,12 @@ public TcpTransport(String transportName, Settings settings, ThreadPool threadPo
protected void doStart() {
}

@Override
public void addConnectionListener(TransportConnectionListener listener) {
transportListener.listeners.add(listener);
public void addMessageListener(TransportMessageListener listener) {
messageListener.listeners.add(listener);
}

@Override
public boolean removeConnectionListener(TransportConnectionListener listener) {
return transportListener.listeners.remove(listener);
public boolean removeMessageListener(TransportMessageListener listener) {
return messageListener.listeners.remove(listener);
}

@Override
Expand Down Expand Up @@ -481,11 +479,6 @@ public NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connect
// underlying channels.
nodeChannels = new NodeChannels(node, channels, connectionProfile, version);
final NodeChannels finalNodeChannels = nodeChannels;
try {
transportListener.onConnectionOpened(nodeChannels);
} finally {
nodeChannels.addCloseListener(ActionListener.wrap(() -> transportListener.onConnectionClosed(finalNodeChannels)));
}

Consumer<TcpChannel> onClose = c -> {
assert c.isOpen() == false : "channel is still open when onClose is called";
Expand Down Expand Up @@ -907,7 +900,7 @@ private void sendRequestToChannel(final DiscoveryNode node, final TcpChannel cha
final TransportRequestOptions finalOptions = options;
// this might be called in a different thread
SendListener onRequestSent = new SendListener(channel, stream,
() -> transportListener.onRequestSent(node, requestId, action, request, finalOptions), message.length());
() -> messageListener.onRequestSent(node, requestId, action, request, finalOptions), message.length());
internalSendMessage(channel, message, onRequestSent);
addedReleaseListener = true;
} finally {
Expand Down Expand Up @@ -961,7 +954,7 @@ public void sendErrorResponse(
final BytesReference header = buildHeader(requestId, status, nodeVersion, bytes.length());
CompositeBytesReference message = new CompositeBytesReference(header, bytes);
SendListener onResponseSent = new SendListener(channel, null,
() -> transportListener.onResponseSent(requestId, action, error), message.length());
() -> messageListener.onResponseSent(requestId, action, error), message.length());
internalSendMessage(channel, message, onResponseSent);
}
}
Expand Down Expand Up @@ -1010,7 +1003,7 @@ private void sendResponse(
final TransportResponseOptions finalOptions = options;
// this might be called in a different thread
SendListener listener = new SendListener(channel, stream,
() -> transportListener.onResponseSent(requestId, action, response, finalOptions), message.length());
() -> messageListener.onResponseSent(requestId, action, response, finalOptions), message.length());
internalSendMessage(channel, message, listener);
addedReleaseListener = true;
} finally {
Expand Down Expand Up @@ -1266,7 +1259,7 @@ public final void messageReceived(BytesReference reference, TcpChannel channel)
if (isHandshake) {
handler = pendingHandshakes.remove(requestId);
} else {
TransportResponseHandler theHandler = responseHandlers.onResponseReceived(requestId, transportListener);
TransportResponseHandler theHandler = responseHandlers.onResponseReceived(requestId, messageListener);
if (theHandler == null && TransportStatus.isError(status)) {
handler = pendingHandshakes.remove(requestId);
} else {
Expand Down Expand Up @@ -1373,7 +1366,7 @@ protected String handleRequest(TcpChannel channel, String profileName, final Str
features = Collections.emptySet();
}
final String action = stream.readString();
transportListener.onRequestReceived(requestId, action);
messageListener.onRequestReceived(requestId, action);
TransportChannel transportChannel = null;
try {
if (TransportStatus.isHandshake(status)) {
Expand Down Expand Up @@ -1682,69 +1675,42 @@ public ProfileSettings(Settings settings, String profileName) {
}
}

private static final class DelegatingTransportConnectionListener implements TransportConnectionListener {
private final List<TransportConnectionListener> listeners = new CopyOnWriteArrayList<>();
private static final class DelegatingTransportMessageListener implements TransportMessageListener {

private final List<TransportMessageListener> listeners = new CopyOnWriteArrayList<>();

@Override
public void onRequestReceived(long requestId, String action) {
for (TransportConnectionListener listener : listeners) {
for (TransportMessageListener listener : listeners) {
listener.onRequestReceived(requestId, action);
}
}

@Override
public void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions finalOptions) {
for (TransportConnectionListener listener : listeners) {
for (TransportMessageListener listener : listeners) {
listener.onResponseSent(requestId, action, response, finalOptions);
}
}

@Override
public void onResponseSent(long requestId, String action, Exception error) {
for (TransportConnectionListener listener : listeners) {
for (TransportMessageListener listener : listeners) {
listener.onResponseSent(requestId, action, error);
}
}

@Override
public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request,
TransportRequestOptions finalOptions) {
for (TransportConnectionListener listener : listeners) {
for (TransportMessageListener listener : listeners) {
listener.onRequestSent(node, requestId, action, request, finalOptions);
}
}

@Override
public void onNodeDisconnected(DiscoveryNode key) {
for (TransportConnectionListener listener : listeners) {
listener.onNodeDisconnected(key);
}
}

@Override
public void onConnectionOpened(Connection nodeChannels) {
for (TransportConnectionListener listener : listeners) {
listener.onConnectionOpened(nodeChannels);
}
}

@Override
public void onNodeConnected(DiscoveryNode node) {
for (TransportConnectionListener listener : listeners) {
listener.onNodeConnected(node);
}
}

@Override
public void onConnectionClosed(Connection nodeChannels) {
for (TransportConnectionListener listener : listeners) {
listener.onConnectionClosed(nodeChannels);
}
}

@Override
public void onResponseReceived(long requestId, ResponseContext holder) {
for (TransportConnectionListener listener : listeners) {
for (TransportMessageListener listener : listeners) {
listener.onResponseReceived(requestId, holder);
}
}
Expand Down
15 changes: 3 additions & 12 deletions server/src/main/java/org/elasticsearch/transport/Transport.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,9 @@ public interface Transport extends LifecycleComponent {
*/
RequestHandlerRegistry getRequestHandler(String action);

/**
* Adds a new event listener
* @param listener the listener to add
*/
void addConnectionListener(TransportConnectionListener listener);
void addMessageListener(TransportMessageListener listener);

/**
* Removes an event listener
* @param listener the listener to remove
* @return <code>true</code> iff the listener was removed otherwise <code>false</code>
*/
boolean removeConnectionListener(TransportConnectionListener listener);
boolean removeMessageListener(TransportMessageListener listener);

/**
* The address the transport is bound on.
Expand Down Expand Up @@ -254,7 +245,7 @@ public List<ResponseContext> prune(Predicate<ResponseContext> predicate) {
* sent request (before any processing or deserialization was done). Returns the appropriate response handler or null if not
* found.
*/
public TransportResponseHandler onResponseReceived(final long requestId, TransportConnectionListener listener) {
public TransportResponseHandler onResponseReceived(final long requestId, TransportMessageListener listener) {
ResponseContext context = handlers.remove(requestId);
listener.onResponseReceived(requestId, context);
if (context == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,42 +28,6 @@
*/
public interface TransportConnectionListener {

/**
* Called once a request is received
* @param requestId the internal request ID
* @param action the request action
*
*/
default void onRequestReceived(long requestId, String action) {}

/**
* Called for every action response sent after the response has been passed to the underlying network implementation.
* @param requestId the request ID (unique per client)
* @param action the request action
* @param response the response send
* @param finalOptions the response options
*/
default void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions finalOptions) {}

/***
* Called for every failed action response after the response has been passed to the underlying network implementation.
* @param requestId the request ID (unique per client)
* @param action the request action
* @param error the error sent back to the caller
*/
default void onResponseSent(long requestId, String action, Exception error) {}

/**
* Called for every request sent to a server after the request has been passed to the underlying network implementation
* @param node the node the request was sent to
* @param requestId the internal request id
* @param action the action name
* @param request the actual request
* @param finalOptions the request options
*/
default void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request,
TransportRequestOptions finalOptions) {}

/**
* Called once a connection was opened
* @param connection the connection
Expand All @@ -76,13 +40,6 @@ default void onConnectionOpened(Transport.Connection connection) {}
*/
default void onConnectionClosed(Transport.Connection connection) {}

/**
* Called for every response received
* @param requestId the request id for this reponse
* @param context the response context or null if the context was already processed ie. due to a timeout.
*/
default void onResponseReceived(long requestId, Transport.ResponseContext context) {}

/**
* Called once a node connection is opened and registered.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;

import org.elasticsearch.cluster.node.DiscoveryNode;

public interface TransportMessageListener {

/**
* Called once a request is received
* @param requestId the internal request ID
* @param action the request action
*
*/
default void onRequestReceived(long requestId, String action) {}

/**
* Called for every action response sent after the response has been passed to the underlying network implementation.
* @param requestId the request ID (unique per client)
* @param action the request action
* @param response the response send
* @param finalOptions the response options
*/
default void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions finalOptions) {}

/***
* Called for every failed action response after the response has been passed to the underlying network implementation.
* @param requestId the request ID (unique per client)
* @param action the request action
* @param error the error sent back to the caller
*/
default void onResponseSent(long requestId, String action, Exception error) {}

/**
* Called for every request sent to a server after the request has been passed to the underlying network implementation
* @param node the node the request was sent to
* @param requestId the internal request id
* @param action the action name
* @param request the actual request
* @param finalOptions the request options
*/
default void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request,
TransportRequestOptions finalOptions) {}

/**
* Called for every response received
* @param requestId the request id for this reponse
* @param context the response context or null if the context was already processed ie. due to a timeout.
*/
default void onResponseReceived(long requestId, Transport.ResponseContext context) {}
}
Loading