Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
6b6981f
Work on connection manager
Tim-Brooks Jul 6, 2018
744375e
WIP
Tim-Brooks Jul 6, 2018
002b371
WIP
Tim-Brooks Jul 7, 2018
06a0093
At least fix checkstyle
Tim-Brooks Jul 7, 2018
f9e7080
Fix test
Tim-Brooks Jul 9, 2018
67bc0b2
Add comment
Tim-Brooks Jul 9, 2018
31e3acf
Merge remote-tracking branch 'upstream/master' into new_cm
Tim-Brooks Jul 16, 2018
17a334a
Merge remote-tracking branch 'upstream/master' into new_cm
Tim-Brooks Jul 23, 2018
c3544ad
Remove additional listeners
Tim-Brooks Jul 23, 2018
fc886bf
Work on fixing tests
Tim-Brooks Jul 23, 2018
f699321
Remove unused
Tim-Brooks Jul 24, 2018
03acc19
Fix checkstyle
Tim-Brooks Jul 24, 2018
796a728
Work on test infra
Tim-Brooks Jul 24, 2018
8005de4
Merge remote-tracking branch 'upstream/master' into new_cm
Tim-Brooks Jul 24, 2018
43dd3ad
Work on tests
Tim-Brooks Jul 25, 2018
acb743a
Remove methods
Tim-Brooks Jul 25, 2018
2eacbfa
Merge remote-tracking branch 'upstream/master' into new_cm
Tim-Brooks Jul 25, 2018
ca9fe7c
Merge remote-tracking branch 'upstream/master' into new_cm
Tim-Brooks Jul 25, 2018
aaf2d87
Use new behaviors
Tim-Brooks Jul 25, 2018
f5778c0
Fix some tests
Tim-Brooks Jul 25, 2018
2302fa1
Merge remote-tracking branch 'upstream/master' into new_cm
Tim-Brooks Jul 26, 2018
95a7b85
Working on fixing tests
Tim-Brooks Jul 26, 2018
2c5badd
Merge remote-tracking branch 'upstream/master' into new_cm
Tim-Brooks Jul 26, 2018
5106852
Work on fixing tests
Tim-Brooks Jul 27, 2018
af37b63
Fix tests
Tim-Brooks Jul 27, 2018
3e4b898
Merge remote-tracking branch 'upstream/master' into new_cm
Tim-Brooks Jul 27, 2018
0436eb0
Work on transport client nodes service tests
Tim-Brooks Jul 27, 2018
f429f9b
fix tests and temporarily mute test
Tim-Brooks Jul 28, 2018
28eafbb
Merge remote-tracking branch 'upstream/master' into new_cm
Tim-Brooks Jul 30, 2018
c45c5ef
Fix tests
Tim-Brooks Aug 1, 2018
2e8a026
Fix checkstyle
Tim-Brooks Aug 1, 2018
630cfaa
Merge remote-tracking branch 'upstream/master' into new_cm
Tim-Brooks Aug 1, 2018
3ee4743
Work on cleaning up
Tim-Brooks Aug 1, 2018
293a572
Merge remote-tracking branch 'upstream/master' into new_cm
Tim-Brooks Aug 1, 2018
2789d0d
Make connection methods required
Tim-Brooks Aug 1, 2018
617521f
Work on tests
Tim-Brooks Aug 1, 2018
76d429c
Fix tests
Tim-Brooks Aug 2, 2018
8c26686
java doc
Tim-Brooks Aug 2, 2018
071490a
Merge remote-tracking branch 'upstream/master' into new_cm
Tim-Brooks Aug 2, 2018
44f6e1d
Merge remote-tracking branch 'upstream/master' into new_cm
Tim-Brooks Aug 2, 2018
4c26165
Merge remote-tracking branch 'upstream/master' into new_cm
Tim-Brooks Aug 6, 2018
453b853
Merge remote-tracking branch 'upstream/master' into new_cm
Tim-Brooks Aug 7, 2018
d18be8c
Changes from review
Tim-Brooks Aug 13, 2018
f259bae
Changes docs
Tim-Brooks Aug 13, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,12 @@ protected Netty4TcpServerChannel bind(String name, InetSocketAddress address) {
return esChannel;
}

ScheduledPing getPing() {
return scheduledPing;
long successfulPingCount() {
return successfulPings.count();
}

long failedPingCount() {
return failedPings.count();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,13 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
Expand Down Expand Up @@ -83,22 +80,19 @@ public void testScheduledPing() throws Exception {
serviceB.connectToNode(nodeA);

assertBusy(() -> {
assertThat(nettyA.getPing().getSuccessfulPings(), greaterThan(100L));
assertThat(nettyB.getPing().getSuccessfulPings(), greaterThan(100L));
assertThat(nettyA.successfulPingCount(), greaterThan(100L));
assertThat(nettyB.successfulPingCount(), greaterThan(100L));
});
assertThat(nettyA.getPing().getFailedPings(), equalTo(0L));
assertThat(nettyB.getPing().getFailedPings(), equalTo(0L));
assertThat(nettyA.failedPingCount(), equalTo(0L));
assertThat(nettyB.failedPingCount(), equalTo(0L));

serviceA.registerRequestHandler("internal:sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
new TransportRequestHandler<TransportRequest.Empty>() {
@Override
public void messageReceived(TransportRequest.Empty request, TransportChannel channel, Task task) {
try {
channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.EMPTY);
} catch (IOException e) {
logger.error("Unexpected failure", e);
fail(e.getMessage());
}
(request, channel, task) -> {
try {
channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.EMPTY);
} catch (IOException e) {
logger.error("Unexpected failure", e);
fail(e.getMessage());
}
});

Expand Down Expand Up @@ -130,11 +124,11 @@ public void handleException(TransportException exp) {
}

assertBusy(() -> {
assertThat(nettyA.getPing().getSuccessfulPings(), greaterThan(200L));
assertThat(nettyB.getPing().getSuccessfulPings(), greaterThan(200L));
assertThat(nettyA.successfulPingCount(), greaterThan(200L));
assertThat(nettyB.successfulPingCount(), greaterThan(200L));
});
assertThat(nettyA.getPing().getFailedPings(), equalTo(0L));
assertThat(nettyB.getPing().getFailedPings(), equalTo(0L));
assertThat(nettyA.failedPingCount(), equalTo(0L));
assertThat(nettyB.failedPingCount(), equalTo(0L));

Releasables.close(serviceA, serviceB);
terminate(threadPool);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,21 +79,25 @@ public FaultDetection(Settings settings, ThreadPool threadPool, TransportService

this.connectionListener = new FDConnectionListener();
if (registerConnectionListener) {
transportService.addConnectionListener(connectionListener);
transportService.addNodeConnectionListener(connectionListener);
}
}

@Override
public void close() {
transportService.removeConnectionListener(connectionListener);
transportService.removeNodeConnectionListener(connectionListener);
}

/**
* This method will be called when the {@link org.elasticsearch.transport.TransportService} raised a node disconnected event
*/
abstract void handleTransportDisconnect(DiscoveryNode node);

private class FDConnectionListener implements TransportConnectionListener {
private class FDConnectionListener implements TransportConnectionListener.NodeConnection {
@Override
public void onNodeConnected(DiscoveryNode node) {
}

@Override
public void onNodeDisconnected(DiscoveryNode node) {
AbstractRunnable runnable = new AbstractRunnable() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractLifecycleRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;

import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;

public class ConnectionManager implements Closeable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add some javadocs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also love to see a unittest for this one. that would be awesome!


private final ConcurrentMap<DiscoveryNode, Transport.Connection> connectedNodes = newConcurrentMap();
private final KeyedLock<String> connectionLock = new KeyedLock<>();
private final Logger logger;
private final Transport transport;
private final ThreadPool threadPool;
private final TimeValue pingSchedule;
private final Lifecycle lifecycle = new Lifecycle();
private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener();

public ConnectionManager(Logger logger, Transport transport, ThreadPool threadPool, TimeValue pingSchedule) {
this.logger = logger;
this.transport = transport;
this.threadPool = threadPool;
this.pingSchedule = pingSchedule;
this.lifecycle.moveToStarted();

if (pingSchedule.millis() > 0) {
threadPool.schedule(pingSchedule, ThreadPool.Names.GENERIC, new ScheduledPing());
}
}

public void addListener(TransportConnectionListener.NodeConnection listener) {
this.connectionListener.listeners.add(listener);
}

public void removeListener(TransportConnectionListener.NodeConnection listener) {
this.connectionListener.listeners.remove(listener);
}

public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile,
CheckedBiConsumer<Transport.Connection, ConnectionProfile, IOException> connectionValidator)
throws ConnectTransportException {
if (node == null) {
throw new ConnectTransportException(null, "can't connect to a null node");
}
ensureOpen();
try (Releasable ignored = connectionLock.acquire(node.getId())) {
Transport.Connection connection = connectedNodes.get(node);
if (connection != null) {
return;
}
boolean success = false;
try {
connection = transport.openConnection(node, connectionProfile);
connectionValidator.accept(connection, connectionProfile);
// we acquire a connection lock, so no way there is an existing connection
connectedNodes.put(node, connection);
if (logger.isDebugEnabled()) {
logger.debug("connected to node [{}]", node);
}
try {
connectionListener.onNodeConnected(node);
} finally {
// TODO: Need to add node disconnect listener
if (connection.isClosed()) {
// we got closed concurrently due to a disconnect or some other event on the channel.
// the close callback will close the NodeChannel instance first and then try to remove
// the connection from the connected nodes. It will NOT acquire the connectionLock for
// the node to prevent any blocking calls on network threads. Yet, we still establish a happens
// before relationship to the connectedNodes.put since we check if we can remove the
// (DiscoveryNode, NodeChannels) tuple from the map after we closed. Here we check if it's closed an if so we
// try to remove it first either way one of the two wins even if the callback has run before we even added the
// tuple to the map since in that case we remove it here again
if (connectedNodes.remove(node, connection)) {
connectionListener.onNodeDisconnected(node);
}
throw new NodeNotConnectedException(node, "connection concurrently closed");
}
}
success = true;
} catch (ConnectTransportException e) {
throw e;
} catch (Exception e) {
throw new ConnectTransportException(node, "general node connection failure", e);
} finally {
if (success == false) { // close the connection if there is a failure
logger.trace(() -> new ParameterizedMessage("failed to connect to [{}], cleaning dangling connections", node));
IOUtils.closeWhileHandlingException(connection);
}
}
}
}

public Transport.Connection getConnection(DiscoveryNode node) {
Transport.Connection connection = connectedNodes.get(node);
if (connection == null) {
throw new NodeNotConnectedException(node, "Node not connected");
}
return connection;
}

public boolean nodeConnected(DiscoveryNode node) {
return connectedNodes.containsKey(node);
}

public void disconnectFromNode(DiscoveryNode node) {
// TODO: Do we need to lock here?
Transport.Connection nodeChannels = connectedNodes.remove(node);
if (nodeChannels != null) { // if we found it and removed it we close and notify
IOUtils.closeWhileHandlingException(nodeChannels, () -> connectionListener.onNodeDisconnected(node));
}
}

private void ensureOpen() {
if (lifecycle.started() == false) {
throw new IllegalStateException("connection manager is closed");
}
}

@Override
public void close() {
lifecycle.moveToStopped();
// TODO: Either add locking externally or in here.
// we are holding a write lock so nobody modifies the connectedNodes / openConnections map - it's safe to first close
// all instances and then clear them maps
Iterator<Map.Entry<DiscoveryNode, Transport.Connection>> iterator = connectedNodes.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<DiscoveryNode, Transport.Connection> next = iterator.next();
try {
IOUtils.closeWhileHandlingException(next.getValue());
connectionListener.onNodeDisconnected(next.getKey());
} finally {
iterator.remove();
}
}

lifecycle.moveToClosed();
}

private class ScheduledPing extends AbstractLifecycleRunnable {

private ScheduledPing() {
super(lifecycle, logger);
}

@Override
protected void doRunInLifecycle() {
for (Map.Entry<DiscoveryNode, Transport.Connection> entry : connectedNodes.entrySet()) {
Transport.Connection connection = entry.getValue();
if (connection.supportsPing()) {
connection.sendPing();
}
}
}

@Override
protected void onAfterInLifecycle() {
try {
threadPool.schedule(pingSchedule, ThreadPool.Names.GENERIC, this);
} catch (EsRejectedExecutionException ex) {
if (ex.isExecutorShutdown()) {
logger.debug("couldn't schedule new ping execution, executor is shutting down", ex);
} else {
throw ex;
}
}
}

@Override
public void onFailure(Exception e) {
if (lifecycle.stoppedOrClosed()) {
logger.trace("failed to send ping transport message", e);
} else {
logger.warn("failed to send ping transport message", e);
}
}
}

private static final class DelegatingNodeConnectionListener implements TransportConnectionListener.NodeConnection {

private final List<TransportConnectionListener.NodeConnection> listeners = new CopyOnWriteArrayList<>();

@Override
public void onNodeDisconnected(DiscoveryNode key) {
for (TransportConnectionListener.NodeConnection listener : listeners) {
listener.onNodeDisconnected(key);
}
}

@Override
public void onNodeConnected(DiscoveryNode node) {
for (TransportConnectionListener.NodeConnection listener : listeners) {
listener.onNodeConnected(node);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
* {@link RemoteClusterService#REMOTE_CONNECTIONS_PER_CLUSTER} until either all eligible nodes are exhausted or the maximum number of
* connections per cluster has been reached.
*/
final class RemoteClusterConnection extends AbstractComponent implements TransportConnectionListener, Closeable {
final class RemoteClusterConnection extends AbstractComponent implements TransportConnectionListener.NodeConnection, Closeable {

private final TransportService transportService;
private final ConnectionProfile remoteProfile;
Expand Down Expand Up @@ -121,7 +121,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
this.skipUnavailable = RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE
.getConcreteSettingForNamespace(clusterAlias).get(settings);
this.connectHandler = new ConnectHandler();
transportService.addConnectionListener(this);
transportService.addNodeConnectionListener(this);
}

/**
Expand All @@ -139,6 +139,10 @@ void updateSkipUnavailable(boolean skipUnavailable) {
this.skipUnavailable = skipUnavailable;
}

@Override
public void onNodeConnected(DiscoveryNode node) {
}

@Override
public void onNodeDisconnected(DiscoveryNode node) {
boolean remove = connectedNodes.remove(node);
Expand Down Expand Up @@ -289,11 +293,26 @@ public void sendRequest(long requestId, String action, TransportRequest request,
TransportActionProxy.wrapRequest(targetNode, request), options);
}

@Override
public boolean supportsPing() {
return proxyConnection.supportsPing();
}

@Override
public void sendPing() {
proxyConnection.sendPing();
}

@Override
public void close() {
assert false: "proxy connections must not be closed";
}

@Override
public boolean isClosed() {
return proxyConnection.isClosed();
}

@Override
public Version getVersion() {
return proxyConnection.getVersion();
Expand Down
Loading