Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,10 @@ public abstract class TransportResponse extends TransportMessage {

public static class Empty extends TransportResponse {
public static final Empty INSTANCE = new Empty();

@Override
public String toString() {
return "Empty{}";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.cluster.coordination;

import org.apache.logging.log4j.CloseableThreadContext;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -38,13 +39,8 @@
import org.elasticsearch.discovery.zen.UnicastHostsProvider.HostsResolver;
import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.disruption.DisruptableMockTransport;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransport;
import org.elasticsearch.transport.RequestHandlerRegistry;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseOptions;
import org.elasticsearch.transport.TransportService;
import org.hamcrest.Matcher;

Expand All @@ -55,7 +51,6 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -184,15 +179,15 @@ class ClusterNode extends AbstractComponent {
private final PersistedState persistedState;
private MasterService masterService;
private TransportService transportService;
private MockTransport mockTransport;
private DisruptableMockTransport mockTransport;

ClusterNode(int nodeIndex) {
super(Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeIdFromIndex(nodeIndex)).build());
this.nodeIndex = nodeIndex;
localNode = createDiscoveryNode();
persistedState = new InMemoryPersistedState(1L,
clusterState(1L, 1L, localNode, initialConfiguration, initialConfiguration, 0L));
setUp();
onNode(this::setUp, localNode).run();
}

private DiscoveryNode createDiscoveryNode() {
Expand All @@ -206,112 +201,44 @@ private DiscoveryNode createDiscoveryNode() {
}

private void setUp() {
mockTransport = new MockTransport() {
mockTransport = new DisruptableMockTransport(logger) {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode destination) {
assert destination.equals(localNode) == false : "non-local message from " + localNode + " to itself";
super.onSendRequest(requestId, action, request, destination);
protected DiscoveryNode getLocalNode() {
return localNode;
}

@Override
protected ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNode destination) {
return ConnectionStatus.CONNECTED;
}

// connecting and handshaking with a new node happens synchronously, so we cannot enqueue these tasks for later
final Consumer<Runnable> scheduler;
@Override
protected Optional<DisruptableMockTransport> getDisruptedCapturingTransport(DiscoveryNode node, String action) {
final Predicate<ClusterNode> matchesDestination;
if (action.equals(HANDSHAKE_ACTION_NAME)) {
scheduler = Runnable::run;
matchesDestination = n -> n.getLocalNode().getAddress().equals(destination.getAddress());
matchesDestination = n -> n.getLocalNode().getAddress().equals(node.getAddress());
} else {
scheduler = deterministicTaskQueue::scheduleNow;
matchesDestination = n -> n.getLocalNode().equals(destination);
matchesDestination = n -> n.getLocalNode().equals(node);
}
return clusterNodes.stream().filter(matchesDestination).findAny().map(cn -> cn.mockTransport);
}

scheduler.accept(new Runnable() {
@Override
public String toString() {
return "delivery of [" + action + "][" + requestId + "]: " + request;
}

@Override
public void run() {
clusterNodes.stream().filter(matchesDestination).findAny().ifPresent(
destinationNode -> {

final RequestHandlerRegistry requestHandler
= destinationNode.mockTransport.getRequestHandler(action);

final TransportChannel transportChannel = new TransportChannel() {
@Override
public String getProfileName() {
return "default";
}

@Override
public String getChannelType() {
return "coordinator-test-channel";
}

@Override
public void sendResponse(final TransportResponse response) {
scheduler.accept(new Runnable() {
@Override
public String toString() {
return "delivery of response " + response
+ " to [" + action + "][" + requestId + "]: " + request;
}

@Override
public void run() {
handleResponse(requestId, response);
}
});
}

@Override
public void sendResponse(TransportResponse response, TransportResponseOptions options) {
sendResponse(response);
}

@Override
public void sendResponse(Exception exception) {
scheduler.accept(new Runnable() {
@Override
public String toString() {
return "delivery of error response " + exception.getMessage()
+ " to [" + action + "][" + requestId + "]: " + request;
}

@Override
public void run() {
handleRemoteError(requestId, exception);
}
});
}
};

try {
processMessageReceived(request, requestHandler, transportChannel);
} catch (Exception e) {
scheduler.accept(new Runnable() {
@Override
public String toString() {
return "delivery of processing error response " + e.getMessage()
+ " to [" + action + "][" + requestId + "]: " + request;
}

@Override
public void run() {
handleRemoteError(requestId, e);
}
});
}
}
);
}
});
@Override
protected void handle(DiscoveryNode sender, DiscoveryNode destination, String action, Runnable doDelivery) {
// handshake needs to run inline as the caller blockingly waits on the result
if (action.equals(HANDSHAKE_ACTION_NAME)) {
onNode(doDelivery, destination).run();
} else {
deterministicTaskQueue.scheduleNow(onNode(doDelivery, destination));
}
}
};

masterService = new FakeThreadPoolMasterService("test", deterministicTaskQueue::scheduleNow);
masterService = new FakeThreadPoolMasterService("test",
runnable -> deterministicTaskQueue.scheduleNow(onNode(runnable, localNode)));
transportService = mockTransport.createTransportService(
settings, deterministicTaskQueue.getThreadPool(), NOOP_TRANSPORT_INTERCEPTOR, a -> localNode, null, emptySet());
settings, deterministicTaskQueue.getThreadPool(runnable -> onNode(runnable, localNode)), NOOP_TRANSPORT_INTERCEPTOR,
a -> localNode, null, emptySet());
coordinator = new Coordinator(settings, transportService, ESAllocationTestCase.createAllocationService(Settings.EMPTY),
masterService, this::getPersistedState, Cluster.this::provideUnicastHosts, Randomness.get());
masterService.setClusterStatePublisher(coordinator);
Expand Down Expand Up @@ -359,9 +286,19 @@ private List<TransportAddress> provideUnicastHosts(HostsResolver ignored) {
}
}

@SuppressWarnings("unchecked")
private static void processMessageReceived(TransportRequest request, RequestHandlerRegistry requestHandler,
TransportChannel transportChannel) throws Exception {
requestHandler.processMessageReceived(request, transportChannel);
private static Runnable onNode(Runnable runnable, DiscoveryNode node) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant to put the DiscoveryNode thing first, otherwise it ends up a long way from the function call if the Runnable is an anonymous class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. With the recent changes, there were no longer any anonymous classes. I've still changed the order

return new Runnable() {
@Override
public void run() {
try (CloseableThreadContext.Instance ignored = CloseableThreadContext.put("nodeId", node.getId())) {
runnable.run();
}
}

@Override
public String toString() {
return node.getId() + ": " + runnable.toString();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that having the UUID of the disco node will be useful here when we start rebooting nodes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added the ephemeral id

}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

public class DeterministicTaskQueue extends AbstractComponent {

Expand Down Expand Up @@ -182,6 +183,13 @@ public void advanceTime() {
* @return A <code>ExecutorService</code> that uses this task queue.
*/
public ExecutorService getExecutorService() {
return getExecutorService(Function.identity());
}

/**
* @return A <code>ExecutorService</code> that uses this task queue and wraps <code>Runnable</code>s in the given wrapper.
*/
public ExecutorService getExecutorService(Function<Runnable, Runnable> runnableWrapper) {
return new ExecutorService() {

@Override
Expand Down Expand Up @@ -246,7 +254,7 @@ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, Ti

@Override
public void execute(Runnable command) {
scheduleNow(command);
scheduleNow(runnableWrapper.apply(command));
}
};
}
Expand All @@ -255,6 +263,13 @@ public void execute(Runnable command) {
* @return A <code>ThreadPool</code> that uses this task queue.
*/
public ThreadPool getThreadPool() {
return getThreadPool(Function.identity());
}

/**
* @return A <code>ThreadPool</code> that uses this task queue and wraps <code>Runnable</code>s in the given wrapper.
*/
public ThreadPool getThreadPool(Function<Runnable, Runnable> runnableWrapper) {
return new ThreadPool(settings) {

{
Expand Down Expand Up @@ -303,12 +318,12 @@ public ThreadPoolStats stats() {

@Override
public ExecutorService generic() {
return getExecutorService();
return getExecutorService(runnableWrapper);
}

@Override
public ExecutorService executor(String name) {
return getExecutorService();
return getExecutorService(runnableWrapper);
}

@Override
Expand All @@ -318,7 +333,7 @@ public ScheduledFuture<?> schedule(TimeValue delay, String executor, Runnable co
final int CANCELLED = 2;
final AtomicInteger taskState = new AtomicInteger(NOT_STARTED);

scheduleAt(currentTimeMillis + delay.millis(), new Runnable() {
scheduleAt(currentTimeMillis + delay.millis(), runnableWrapper.apply(new Runnable() {
@Override
public void run() {
if (taskState.compareAndSet(NOT_STARTED, STARTED)) {
Expand All @@ -330,7 +345,7 @@ public void run() {
public String toString() {
return command.toString();
}
});
}));

return new ScheduledFuture<Object>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ private TestThreadInfoPatternConverter() {
@Override
public void format(LogEvent event, StringBuilder toAppendTo) {
toAppendTo.append(threadInfo(event.getThreadName()));
if (event.getContextData().isEmpty() == false) {
toAppendTo.append(event.getContextData());
}
}

private static final Pattern ELASTICSEARCH_THREAD_NAME_PATTERN =
Expand All @@ -66,6 +69,7 @@ public void format(LogEvent event, StringBuilder toAppendTo) {
Pattern.compile("SUITE-.+-worker");
private static final Pattern NOT_YET_NAMED_NODE_THREAD_NAME_PATTERN =
Pattern.compile("test_SUITE-CHILD_VM.+cluster\\[T#(.+)\\]");

static String threadInfo(String threadName) {
Matcher m = ELASTICSEARCH_THREAD_NAME_PATTERN.matcher(threadName);
if (m.matches()) {
Expand Down
Loading