Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,12 @@ public BootstrapClusterResponse read(StreamInput in) throws IOException {
}
});
}

public String getBootstrapDescription() {
if (initialMasterNodeCount == 0) {
return "external cluster bootstrapping";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is meant by "external cluster bootstrapping"?

} else {
return "discovery of at least " + initialMasterNodeCount + " master-eligible nodes for cluster bootstrapping";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public static class Builder {
public Builder() {

}

public Builder(CoordinationMetaData state) {
this.term = state.term;
this.lastCommittedConfiguration = state.lastCommittedConfiguration;
Expand Down Expand Up @@ -386,5 +386,17 @@ public static VotingConfiguration of(DiscoveryNode... nodes) {
// this could be used in many more places - TODO use this where appropriate
return new VotingConfiguration(Arrays.stream(nodes).map(DiscoveryNode::getId).collect(Collectors.toSet()));
}

public String getQuorumDescription() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method (and the other describe methods) are without context in their respective classes. I would prefer to have the full construction of the output in warnClusterFormationFailed

if (nodeIds.isEmpty()) {
return "cluster bootstrapping";
} else if (nodeIds.size() == 1) {
return "node with id " + nodeIds;
} else if (nodeIds.size() == 2) {
return "two nodes with ids " + nodeIds;
} else {
return "at least " + (nodeIds.size() / 2 + 1) + " nodes with ids from " + nodeIds;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,14 @@ public void invariant() {
assert publishVotes.isEmpty() || electionWon();
}

public String getQuorumDescription() {
if (getLastAcceptedConfiguration().equals(getLastCommittedConfiguration())) {
return getLastAcceptedConfiguration().getQuorumDescription();
} else {
return getLastAcceptedConfiguration().getQuorumDescription() + " and " + getLastCommittedConfiguration().getQuorumDescription();
}
}

/**
* Pluggable persistence layer for {@link CoordinationState}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfigExclusion;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
import org.elasticsearch.cluster.coordination.CoordinationState.VoteCollection;
import org.elasticsearch.cluster.coordination.FollowersChecker.FollowerCheckRequest;
import org.elasticsearch.cluster.coordination.JoinHelper.InitialJoinAccumulator;
import org.elasticsearch.cluster.metadata.MetaData;
Expand All @@ -52,6 +53,7 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -931,6 +933,28 @@ protected void onFoundPeersUpdated() {
discoveredNodesListener.accept(foundPeers);
}
}

@Override
protected void warnClusterFormationFailed(DiscoveryNodes clusterStateNodes, List<TransportAddress> resolvedAddresses,
List<DiscoveryNode> foundPeers) {
final String quorumDescription;
synchronized (mutex) {
if (isInitialConfigurationSet()) {
quorumDescription = coordinationState.get().getQuorumDescription();
} else {
quorumDescription = clusterBootstrapService.getBootstrapDescription();
}
}

final VoteCollection possibleVotes = new VoteCollection();
foundPeers.forEach(possibleVotes::addVote);
final String isQuorumOrNot = coordinationState.get().isElectionQuorum(possibleVotes) ? "is a quorum" : "is not a quorum";

logger.warn("leader not discovered or elected yet: election requires {}, have discovered {} which {}; discovery " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's swap leader for master.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case this is not a master-eligible node: does it even make sense to talk about elections here? Maybe it should state that it is a non-master-eligible nodes and that it cannot find a master?

"continues using {} from hosts providers and {} from last-known cluster state", quorumDescription, foundPeers,
isQuorumOrNot, resolvedAddresses,
StreamSupport.stream(clusterStateNodes.spliterator(), false).map(DiscoveryNode::toString).collect(Collectors.toList()));
}
}

private void startElectionScheduler() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ public void apply(Settings value, Settings current, Settings previous) {
EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING,
PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING,
PeerFinder.DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING,
PeerFinder.DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING,
ElectionSchedulerFactory.ELECTION_INITIAL_TIMEOUT_SETTING,
ElectionSchedulerFactory.ELECTION_BACK_OFF_TIME_SETTING,
ElectionSchedulerFactory.ELECTION_MAX_TIMEOUT_SETTING,
Expand Down
45 changes: 43 additions & 2 deletions server/src/main/java/org/elasticsearch/discovery/PeerFinder.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static java.util.Collections.emptyList;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;

public abstract class PeerFinder {
Expand All @@ -79,10 +80,15 @@ public abstract class PeerFinder {
Setting.timeSetting("discovery.request_peers_timeout",
TimeValue.timeValueMillis(3000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);

public static final Setting<TimeValue> DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING =
Setting.timeSetting("discovery.cluster_formation_warning_timeout",
TimeValue.timeValueMillis(10000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);

private final Settings settings;

private final TimeValue findPeersInterval;
private final TimeValue requestPeersTimeout;
private final TimeValue clusterFormationWarningTimeout;

private final Object mutex = new Object();
private final TransportService transportService;
Expand All @@ -94,12 +100,15 @@ public abstract class PeerFinder {
private DiscoveryNodes lastAcceptedNodes;
private final Map<TransportAddress, Peer> peersByAddress = newConcurrentMap();
private Optional<DiscoveryNode> leader = Optional.empty();
private List<TransportAddress> lastResolvedAddresses = emptyList();
private long nextWarningTimeMillis;

public PeerFinder(Settings settings, TransportService transportService, TransportAddressConnector transportAddressConnector,
ConfiguredHostsResolver configuredHostsResolver) {
this.settings = settings;
findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings);
requestPeersTimeout = DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING.get(settings);
clusterFormationWarningTimeout = DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING.get(settings);
this.transportService = transportService;
this.transportAddressConnector = transportAddressConnector;
this.configuredHostsResolver = configuredHostsResolver;
Expand All @@ -120,12 +129,18 @@ public void activate(final DiscoveryNodes lastAcceptedNodes) {
active = true;
this.lastAcceptedNodes = lastAcceptedNodes;
leader = Optional.empty();
updateNextWarningTime();
handleWakeUp(); // return value discarded: there are no known peers, so none can be disconnected
}

onFoundPeersUpdated(); // trigger a check for a quorum already
}

private void updateNextWarningTime() {
assert holdsLock() : "PeerFinder mutex not held";
nextWarningTimeMillis = transportService.getThreadPool().relativeTimeInMillis() + clusterFormationWarningTimeout.millis();
}

public void deactivate(DiscoveryNode leader) {
final boolean peersRemoved;
synchronized (mutex) {
Expand Down Expand Up @@ -164,7 +179,7 @@ PeersResponse handlePeersRequest(PeersRequest peersRequest) {
knownPeers = getFoundPeersUnderLock();
} else {
assert leader.isPresent() || lastAcceptedNodes == null;
knownPeers = Collections.emptyList();
knownPeers = emptyList();
}
return new PeersResponse(leader, knownPeers, currentTerm);
}
Expand Down Expand Up @@ -266,6 +281,7 @@ private boolean handleWakeUp() {

configuredHostsResolver.resolveConfiguredHosts(providedAddresses -> {
synchronized (mutex) {
lastResolvedAddresses = providedAddresses;
logger.trace("probing resolved transport addresses {}", providedAddresses);
providedAddresses.forEach(this::startProbe);
}
Expand Down Expand Up @@ -293,6 +309,11 @@ protected void doRun() {
onFoundPeersUpdated();
}

@Override
public void onAfter() {
maybeWarnClusterFormationFailed();
}

@Override
public String toString() {
return "PeerFinder handling wakeup";
Expand All @@ -302,6 +323,26 @@ public String toString() {
return peersRemoved;
}

private void maybeWarnClusterFormationFailed() {
assert holdsLock() == false : "PeerFinder mutex held in error";
final DiscoveryNodes lastAcceptedNodes;
final List<TransportAddress> lastResolvedAddresses;
final List<DiscoveryNode> foundPeers;
synchronized (mutex) {
if (nextWarningTimeMillis >= transportService.getThreadPool().relativeTimeInMillis()) {
return;
}
updateNextWarningTime();
lastAcceptedNodes = PeerFinder.this.lastAcceptedNodes;
lastResolvedAddresses = PeerFinder.this.lastResolvedAddresses;
foundPeers = getFoundPeersUnderLock();
}
warnClusterFormationFailed(lastAcceptedNodes, lastResolvedAddresses, foundPeers);
}

protected abstract void warnClusterFormationFailed(DiscoveryNodes clusterStateNodes, List<TransportAddress> resolvedAddresses,
List<DiscoveryNode> foundPeers);

private void startProbe(TransportAddress transportAddress) {
assert holdsLock() : "PeerFinder mutex not held";
if (active == false) {
Expand Down Expand Up @@ -495,7 +536,7 @@ private class Zen1UnicastPingRequestHandler implements TransportRequestHandler<U
@Override
public void messageReceived(UnicastZenPing.UnicastPingRequest request, TransportChannel channel, Task task) throws Exception {
final PeersRequest peersRequest = new PeersRequest(request.pingResponse.node(),
Optional.ofNullable(request.pingResponse.master()).map(Collections::singletonList).orElse(Collections.emptyList()));
Optional.ofNullable(request.pingResponse.master()).map(Collections::singletonList).orElse(emptyList()));
final PeersResponse peersResponse = handlePeersRequest(peersRequest);
final List<ZenPing.PingResponse> pingResponses = new ArrayList<>();
final ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import static java.util.Collections.singleton;
import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.is;

public class ClusterBootstrapServiceTests extends ESTestCase {

Expand Down Expand Up @@ -197,4 +198,12 @@ public String toString() {
deterministicTaskQueue.runAllTasks();
// termination means success
}

public void testBootstrapDescription() {
assertThat(clusterBootstrapService.getBootstrapDescription(),
is("discovery of at least 3 master-eligible nodes for cluster bootstrapping"));

assertThat(new ClusterBootstrapService(Settings.EMPTY, transportService).getBootstrapDescription(),
is("external cluster bootstrapping"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@
import java.util.HashSet;
import java.util.Set;

import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;

public class CoordinationMetaDataTests extends ESTestCase {

Expand All @@ -46,6 +49,7 @@ public void testVotingConfiguration() {
assertThat(config0.isEmpty(), equalTo(true));
assertThat(config0.hasQuorum(Sets.newHashSet()), equalTo(false));
assertThat(config0.hasQuorum(Sets.newHashSet("id1")), equalTo(false));
assertThat(config0.getQuorumDescription(), is("cluster bootstrapping"));

VotingConfiguration config1 = new VotingConfiguration(Sets.newHashSet("id1"));
assertThat(config1.getNodeIds(), equalTo(Sets.newHashSet("id1")));
Expand All @@ -54,6 +58,7 @@ public void testVotingConfiguration() {
assertThat(config1.hasQuorum(Sets.newHashSet("id1", "id2")), equalTo(true));
assertThat(config1.hasQuorum(Sets.newHashSet("id2")), equalTo(false));
assertThat(config1.hasQuorum(Sets.newHashSet()), equalTo(false));
assertThat(config1.getQuorumDescription(), is("node with id [id1]"));

VotingConfiguration config2 = new VotingConfiguration(Sets.newHashSet("id1", "id2"));
assertThat(config2.getNodeIds(), equalTo(Sets.newHashSet("id1", "id2")));
Expand All @@ -65,6 +70,7 @@ public void testVotingConfiguration() {
assertThat(config2.hasQuorum(Sets.newHashSet("id3")), equalTo(false));
assertThat(config2.hasQuorum(Sets.newHashSet("id1", "id3")), equalTo(false));
assertThat(config2.hasQuorum(Sets.newHashSet()), equalTo(false));
assertThat(config2.getQuorumDescription(), anyOf(is("two nodes with ids [id1, id2]"), is("two nodes with ids [id2, id1]")));

VotingConfiguration config3 = new VotingConfiguration(Sets.newHashSet("id1", "id2", "id3"));
assertThat(config3.getNodeIds(), equalTo(Sets.newHashSet("id1", "id2", "id3")));
Expand All @@ -80,6 +86,10 @@ public void testVotingConfiguration() {
assertThat(config3.hasQuorum(Sets.newHashSet("id1", "id4")), equalTo(false));
assertThat(config3.hasQuorum(Sets.newHashSet("id1", "id4", "id5")), equalTo(false));
assertThat(config3.hasQuorum(Sets.newHashSet()), equalTo(false));
assertThat(config3.getQuorumDescription(), startsWith("at least 2 nodes with ids from ["));

VotingConfiguration config4 = new VotingConfiguration(Sets.newHashSet("id1", "id2", "id3", "id4"));
assertThat(config4.getQuorumDescription(), startsWith("at least 3 nodes with ids from ["));
}

public void testVotingConfigurationSerializationEqualsHashCode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;

public class CoordinationStateTests extends ESTestCase {

Expand Down Expand Up @@ -766,6 +767,16 @@ public void testVoteCollection() {
});
}

public void testQuorumDescription() {
VotingConfiguration node1Config = new VotingConfiguration(Collections.singleton(node1.getId()));
cs1.setInitialState(clusterState(0L, 1L, node1, node1Config, node1Config, 42L));
assertThat(cs1.getQuorumDescription(), is("node with id [node1]"));

VotingConfiguration node2Config = new VotingConfiguration(Collections.singleton(node2.getId()));
cs2.setInitialState(clusterState(0L, 1L, node1, node1Config, node2Config, 42L));
assertThat(cs2.getQuorumDescription(), is("node with id [node2] and node with id [node1]"));
}

public void testSafety() {
new Cluster(randomIntBetween(1, 5)).runRandomly();
}
Expand Down Expand Up @@ -811,7 +822,7 @@ public static ClusterState setValue(ClusterState clusterState, long value) {
public static long value(ClusterState clusterState) {
return clusterState.metaData().persistentSettings().getAsLong("value", 0L);
}

static class ClusterNode {

final DiscoveryNode localNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ public String toString() {
class TestPeerFinder extends PeerFinder {
DiscoveryNode discoveredMasterNode;
OptionalLong discoveredMasterTerm = OptionalLong.empty();
boolean emittedWarning;

TestPeerFinder(Settings settings, TransportService transportService, TransportAddressConnector transportAddressConnector) {
super(settings, transportService, transportAddressConnector, PeerFinderTests.this::resolveConfiguredHosts);
Expand All @@ -172,6 +173,12 @@ protected void onFoundPeersUpdated() {
foundPeersFromNotification = getFoundPeers();
logger.trace("onFoundPeersUpdated({})", foundPeersFromNotification);
}

@Override
protected void warnClusterFormationFailed(DiscoveryNodes clusterStateNodes, List<TransportAddress> resolvedAddresses,
List<DiscoveryNode> foundPeers) {
emittedWarning = true;
}
}

private void resolveConfiguredHosts(Consumer<List<TransportAddress>> onResult) {
Expand Down Expand Up @@ -773,6 +780,39 @@ public void testReconnectsToDisconnectedNodes() {
assertFoundPeers(rebootedOtherNode);
}

public void testEmitsWarningsIfClusterDoesNotFormFastEnough() {
peerFinder.activate(lastAcceptedNodes);
assertFalse(peerFinder.emittedWarning);

final long warningTimeout = PeerFinder.DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING.get(Settings.EMPTY).millis()
+ PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(Settings.EMPTY).millis();

final long expectedWarningTime1 = deterministicTaskQueue.getCurrentTimeMillis() + warningTimeout;
while (deterministicTaskQueue.getCurrentTimeMillis() < expectedWarningTime1) {
deterministicTaskQueue.advanceTime();
runAllRunnableTasks();
}

assertTrue(peerFinder.emittedWarning);
peerFinder.emittedWarning = false;

final long expectedNoWarningTime = deterministicTaskQueue.getCurrentTimeMillis()
+ PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(Settings.EMPTY).millis() * 3;
final long expectedWarningTime2 = deterministicTaskQueue.getCurrentTimeMillis() + warningTimeout;

while (deterministicTaskQueue.getCurrentTimeMillis() < expectedNoWarningTime) {
deterministicTaskQueue.advanceTime();
runAllRunnableTasks();
}
assertFalse(peerFinder.emittedWarning);

while (deterministicTaskQueue.getCurrentTimeMillis() < expectedWarningTime2) {
deterministicTaskQueue.advanceTime();
runAllRunnableTasks();
}
assertTrue(peerFinder.emittedWarning);
}

private void respondToRequests(Function<DiscoveryNode, PeersResponse> responseFactory) {
final CapturedRequest[] capturedRequests = capturingTransport.getCapturedRequestsAndClear();
for (final CapturedRequest capturedRequest : capturedRequests) {
Expand Down