diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java new file mode 100644 index 0000000000000..3fef7415739fd --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java @@ -0,0 +1,348 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.coordination; + +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.discovery.Discovery.AckListener; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportResponse; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.function.LongSupplier; + +public abstract class Publication extends AbstractComponent { + + private final List publicationTargets; + private final PublishRequest publishRequest; + private final AckListener ackListener; + private final LongSupplier currentTimeSupplier; + private final long startTime; + + private Optional applyCommitRequest; // set when state is committed + private boolean isCompleted; // set when publication is completed + private boolean timedOut; // set when publication timed out + + public Publication(Settings settings, PublishRequest publishRequest, AckListener ackListener, LongSupplier currentTimeSupplier) { + super(settings); + this.publishRequest = publishRequest; + this.ackListener = ackListener; + this.currentTimeSupplier = currentTimeSupplier; + startTime = currentTimeSupplier.getAsLong(); + applyCommitRequest = Optional.empty(); + publicationTargets = new ArrayList<>(publishRequest.getAcceptedState().getNodes().getNodes().size()); + publishRequest.getAcceptedState().getNodes().iterator().forEachRemaining(n -> publicationTargets.add(new PublicationTarget(n))); + } + + public void start(Set faultyNodes) { + logger.trace("publishing {} to {}", publishRequest, publicationTargets); + + for (final DiscoveryNode faultyNode : faultyNodes) { + onFaultyNode(faultyNode); + } + onPossibleCommitFailure(); + publicationTargets.forEach(PublicationTarget::sendPublishRequest); + } + + public void onTimeout() { + assert timedOut == false; + timedOut = true; + if (applyCommitRequest.isPresent() == false) { + logger.debug("onTimeout: [{}] timed out before committing", this); + // fail all current publications + final Exception e = new ElasticsearchException("publication timed out before committing"); + publicationTargets.stream().filter(PublicationTarget::isActive).forEach(pt -> pt.setFailed(e)); + } + onPossibleCompletion(); + } + + public void onFaultyNode(DiscoveryNode faultyNode) { + publicationTargets.forEach(t -> t.onFaultyNode(faultyNode)); + onPossibleCompletion(); + } + + private void onPossibleCompletion() { + if (isCompleted) { + return; + } + + if (timedOut == false) { + for (final PublicationTarget target : publicationTargets) { + if (target.isActive()) { + return; + } + } + } + + if (applyCommitRequest.isPresent() == false) { + logger.debug("onPossibleCompletion: [{}] commit failed", this); + assert isCompleted == false; + isCompleted = true; + onCompletion(false); + return; + } + + assert isCompleted == false; + isCompleted = true; + onCompletion(true); + assert applyCommitRequest.isPresent(); + logger.trace("onPossibleCompletion: [{}] was successful", this); + } + + // For assertions only: verify that this invariant holds + private boolean publicationCompletedIffAllTargetsInactiveOrTimedOut() { + if (timedOut == false) { + for (final PublicationTarget target : publicationTargets) { + if (target.isActive()) { + return isCompleted == false; + } + } + } + return isCompleted; + } + + private void onPossibleCommitFailure() { + if (applyCommitRequest.isPresent()) { + onPossibleCompletion(); + return; + } + + final CoordinationState.VoteCollection possiblySuccessfulNodes = new CoordinationState.VoteCollection(); + for (PublicationTarget publicationTarget : publicationTargets) { + if (publicationTarget.mayCommitInFuture()) { + possiblySuccessfulNodes.addVote(publicationTarget.discoveryNode); + } else { + assert publicationTarget.isFailed() : publicationTarget; + } + } + + if (isPublishQuorum(possiblySuccessfulNodes) == false) { + logger.debug("onPossibleCommitFailure: non-failed nodes {} do not form a quorum, so {} cannot succeed", + possiblySuccessfulNodes, this); + Exception e = new Discovery.FailedToCommitClusterStateException("non-failed nodes do not form a quorum"); + publicationTargets.stream().filter(PublicationTarget::isActive).forEach(pt -> pt.setFailed(e)); + onPossibleCompletion(); + } + } + + protected abstract void onCompletion(boolean committed); + + protected abstract boolean isPublishQuorum(CoordinationState.VoteCollection votes); + + protected abstract Optional handlePublishResponse(DiscoveryNode sourceNode, PublishResponse publishResponse); + + protected abstract void onPossibleJoin(DiscoveryNode sourceNode, PublishWithJoinResponse response); + + protected abstract void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest, + ActionListener responseActionListener); + + protected abstract void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommit, + ActionListener responseActionListener); + + @Override + public String toString() { + return "Publication{term=" + publishRequest.getAcceptedState().term() + + ", version=" + publishRequest.getAcceptedState().version() + '}'; + } + + enum PublicationTargetState { + NOT_STARTED, + FAILED, + SENT_PUBLISH_REQUEST, + WAITING_FOR_QUORUM, + SENT_APPLY_COMMIT, + APPLIED_COMMIT, + } + + class PublicationTarget { + private final DiscoveryNode discoveryNode; + private boolean ackIsPending = true; + private PublicationTargetState state = PublicationTargetState.NOT_STARTED; + + PublicationTarget(DiscoveryNode discoveryNode) { + this.discoveryNode = discoveryNode; + } + + @Override + public String toString() { + return "PublicationTarget{" + + "discoveryNode=" + discoveryNode + + ", state=" + state + + ", ackIsPending=" + ackIsPending + + '}'; + } + + void sendPublishRequest() { + if (isFailed()) { + return; + } + assert state == PublicationTargetState.NOT_STARTED : state + " -> " + PublicationTargetState.SENT_PUBLISH_REQUEST; + state = PublicationTargetState.SENT_PUBLISH_REQUEST; + Publication.this.sendPublishRequest(discoveryNode, publishRequest, new PublishResponseHandler()); + // TODO Can this ^ fail with an exception? Target should be failed if so. + assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); + } + + void handlePublishResponse(PublishResponse publishResponse) { + assert isWaitingForQuorum() : this; + logger.trace("handlePublishResponse: handling [{}] from [{}])", publishResponse, discoveryNode); + if (applyCommitRequest.isPresent()) { + sendApplyCommit(); + } else { + Publication.this.handlePublishResponse(discoveryNode, publishResponse).ifPresent(applyCommit -> { + assert applyCommitRequest.isPresent() == false; + applyCommitRequest = Optional.of(applyCommit); + ackListener.onCommit(TimeValue.timeValueMillis(currentTimeSupplier.getAsLong() - startTime)); + publicationTargets.stream().filter(PublicationTarget::isWaitingForQuorum).forEach(PublicationTarget::sendApplyCommit); + }); + } + } + + void sendApplyCommit() { + assert state == PublicationTargetState.WAITING_FOR_QUORUM : state + " -> " + PublicationTargetState.SENT_APPLY_COMMIT; + state = PublicationTargetState.SENT_APPLY_COMMIT; + assert applyCommitRequest.isPresent(); + Publication.this.sendApplyCommit(discoveryNode, applyCommitRequest.get(), new ApplyCommitResponseHandler()); + assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); + } + + void setAppliedCommit() { + assert state == PublicationTargetState.SENT_APPLY_COMMIT : state + " -> " + PublicationTargetState.APPLIED_COMMIT; + state = PublicationTargetState.APPLIED_COMMIT; + ackOnce(null); + } + + void setFailed(Exception e) { + assert state != PublicationTargetState.APPLIED_COMMIT : state + " -> " + PublicationTargetState.FAILED; + state = PublicationTargetState.FAILED; + ackOnce(e); + } + + void onFaultyNode(DiscoveryNode faultyNode) { + if (isActive() && discoveryNode.equals(faultyNode)) { + logger.debug("onFaultyNode: [{}] is faulty, failing target in publication {}", faultyNode, Publication.this); + setFailed(new ElasticsearchException("faulty node")); + onPossibleCommitFailure(); + } + } + + private void ackOnce(Exception e) { + if (ackIsPending) { + ackIsPending = false; + ackListener.onNodeAck(discoveryNode, e); + } + } + + boolean isActive() { + return state != PublicationTargetState.FAILED + && state != PublicationTargetState.APPLIED_COMMIT; + } + + boolean isWaitingForQuorum() { + return state == PublicationTargetState.WAITING_FOR_QUORUM; + } + + boolean mayCommitInFuture() { + return (state == PublicationTargetState.NOT_STARTED + || state == PublicationTargetState.SENT_PUBLISH_REQUEST + || state == PublicationTargetState.WAITING_FOR_QUORUM); + } + + boolean isFailed() { + return state == PublicationTargetState.FAILED; + } + + private class PublishResponseHandler implements ActionListener { + + @Override + public void onResponse(PublishWithJoinResponse response) { + if (isFailed()) { + logger.debug("PublishResponseHandler.handleResponse: already failed, ignoring response from [{}]", discoveryNode); + assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); + return; + } + + // TODO: check if we need to pass the full response here or if it's sufficient to just pass the optional join. + onPossibleJoin(discoveryNode, response); + + assert state == PublicationTargetState.SENT_PUBLISH_REQUEST : state + " -> " + PublicationTargetState.WAITING_FOR_QUORUM; + state = PublicationTargetState.WAITING_FOR_QUORUM; + handlePublishResponse(response.getPublishResponse()); + + assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); + } + + @Override + public void onFailure(Exception e) { + assert e instanceof TransportException; + final TransportException exp = (TransportException) e; + if (exp.getRootCause() instanceof CoordinationStateRejectedException) { + logger.debug("PublishResponseHandler: [{}] failed: {}", discoveryNode, exp.getRootCause().getMessage()); + } else { + logger.debug(() -> new ParameterizedMessage("PublishResponseHandler: [{}] failed", discoveryNode), exp); + } + assert ((TransportException) e).getRootCause() instanceof Exception; + setFailed((Exception) exp.getRootCause()); + onPossibleCommitFailure(); + assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); + } + + } + + private class ApplyCommitResponseHandler implements ActionListener { + + @Override + public void onResponse(TransportResponse.Empty ignored) { + if (isFailed()) { + logger.debug("ApplyCommitResponseHandler.handleResponse: already failed, ignoring response from [{}]", + discoveryNode); + return; + } + setAppliedCommit(); + onPossibleCompletion(); + assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); + } + + @Override + public void onFailure(Exception e) { + assert e instanceof TransportException; + final TransportException exp = (TransportException) e; + if (exp.getRootCause() instanceof CoordinationStateRejectedException) { + logger.debug("ApplyCommitResponseHandler: [{}] failed: {}", discoveryNode, exp.getRootCause().getMessage()); + } else { + logger.debug(() -> new ParameterizedMessage("ApplyCommitResponseHandler: [{}] failed", discoveryNode), exp); + } + assert ((TransportException) e).getRootCause() instanceof Exception; + setFailed((Exception) exp.getRootCause()); + onPossibleCompletion(); + assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PublishResponse.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PublishResponse.java index f213657ed2e7a..be7c11857021a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/PublishResponse.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PublishResponse.java @@ -20,25 +20,43 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import java.io.IOException; /** * Response to a {@link PublishRequest}, carrying the term and version of the request. + * Typically wrapped in a {@link PublishWithJoinResponse}. */ -public class PublishResponse extends TermVersionResponse { +public class PublishResponse implements Writeable { + + protected final long term; + protected final long version; public PublishResponse(long term, long version) { - super(term, version); + assert term >= 0; + assert version >= 0; + + this.term = term; + this.version = version; } public PublishResponse(StreamInput in) throws IOException { - super(in); + this(in.readLong(), in.readLong()); } @Override public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); + out.writeLong(term); + out.writeLong(version); + } + + public long getTerm() { + return term; + } + + public long getVersion() { + return version; } @Override @@ -48,4 +66,22 @@ public String toString() { ", version=" + version + '}'; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + PublishResponse response = (PublishResponse) o; + + if (term != response.term) return false; + return version == response.version; + } + + @Override + public int hashCode() { + int result = (int) (term ^ (term >>> 32)); + result = 31 * result + (int) (version ^ (version >>> 32)); + return result; + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PublishWithJoinResponse.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PublishWithJoinResponse.java new file mode 100644 index 0000000000000..8628177895ad5 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PublishWithJoinResponse.java @@ -0,0 +1,86 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.transport.TransportResponse; + +import java.io.IOException; +import java.util.Optional; + +/** + * Response to a {@link PublishRequest}. Encapsulates both a {@link PublishResponse} + * and an optional {@link Join}. + */ +public class PublishWithJoinResponse extends TransportResponse { + private final PublishResponse publishResponse; + private final Optional optionalJoin; + + public PublishWithJoinResponse(PublishResponse publishResponse, Optional optionalJoin) { + this.publishResponse = publishResponse; + this.optionalJoin = optionalJoin; + } + + public PublishWithJoinResponse(StreamInput in) throws IOException { + this.publishResponse = new PublishResponse(in); + this.optionalJoin = Optional.ofNullable(in.readOptionalWriteable(Join::new)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + publishResponse.writeTo(out); + out.writeOptionalWriteable(optionalJoin.orElse(null)); + } + + public PublishResponse getPublishResponse() { + return publishResponse; + } + + public Optional getJoin() { + return optionalJoin; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof PublishWithJoinResponse)) return false; + + PublishWithJoinResponse that = (PublishWithJoinResponse) o; + + if (!publishResponse.equals(that.publishResponse)) return false; + return optionalJoin.equals(that.optionalJoin); + } + + @Override + public int hashCode() { + int result = publishResponse.hashCode(); + result = 31 * result + optionalJoin.hashCode(); + return result; + } + + @Override + public String toString() { + return "PublishWithJoinResponse{" + + "publishResponse=" + publishResponse + + ", optionalJoin=" + optionalJoin + + '}'; + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/TermVersionResponse.java b/server/src/main/java/org/elasticsearch/cluster/coordination/TermVersionResponse.java deleted file mode 100644 index 5eba2e6b732a5..0000000000000 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/TermVersionResponse.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.cluster.coordination; - -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.transport.TransportResponse; - -import java.io.IOException; - -abstract class TermVersionResponse extends TransportResponse { - protected final long term; - protected final long version; - - TermVersionResponse(long term, long version) { - assert term >= 0; - assert version >= 0; - - this.term = term; - this.version = version; - } - - TermVersionResponse(StreamInput in) throws IOException { - this(in.readLong(), in.readLong()); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeLong(term); - out.writeLong(version); - } - - public long getTerm() { - return term; - } - - public long getVersion() { - return version; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - TermVersionResponse response = (TermVersionResponse) o; - - if (term != response.term) return false; - return version == response.version; - } - - @Override - public int hashCode() { - int result = (int) (term ^ (term >>> 32)); - result = 31 * result + (int) (version ^ (version >>> 32)); - return result; - } - - @Override - public String toString() { - return "TermVersionResponse{" + - "term=" + term + - ", version=" + version + - '}'; - } -} diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java index e7f61efa69054..e39e14840ddf1 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java @@ -74,7 +74,7 @@ public void setupNodes() { cs3 = createCoordinationState(new InMemoryPersistedState(0L, initialStateNode3), node3); } - private DiscoveryNode createNode(String id) { + public static DiscoveryNode createNode(String id) { return new DiscoveryNode(id, buildNewFakeTransportAddress(), Version.CURRENT); } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java index 7fa4a3217348f..1770770e02f78 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java @@ -25,6 +25,8 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.EqualsHashCodeTestUtils; +import java.util.Optional; + public class MessagesTests extends ESTestCase { private DiscoveryNode createNode(String id) { @@ -96,6 +98,33 @@ public void testPublishResponseEqualsHashCodeSerialization() { }); } + public void testPublishWithJoinResponseEqualsHashCodeSerialization() { + PublishResponse initialPublishResponse = new PublishResponse(randomNonNegativeLong(), randomNonNegativeLong()); + Join initialJoin = new Join(createNode(randomAlphaOfLength(10)), createNode(randomAlphaOfLength(10)), randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong()); + PublishWithJoinResponse initialPublishWithJoinResponse = new PublishWithJoinResponse(initialPublishResponse, + randomBoolean() ? Optional.empty() : Optional.of(initialJoin)); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialPublishWithJoinResponse, + publishWithJoinResponse -> copyWriteable(publishWithJoinResponse, writableRegistry(), PublishWithJoinResponse::new), + publishWithJoinResponse -> { + switch (randomInt(1)) { + case 0: + // change publish response + return new PublishWithJoinResponse(new PublishResponse(randomNonNegativeLong(), randomNonNegativeLong()), + publishWithJoinResponse.getJoin()); + case 1: + // change optional join + Join newJoin = new Join(createNode(randomAlphaOfLength(10)), createNode(randomAlphaOfLength(10)), + randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); + return new PublishWithJoinResponse(publishWithJoinResponse.getPublishResponse(), + publishWithJoinResponse.getJoin().isPresent() && randomBoolean() ? Optional.empty() : Optional.of(newJoin)); + default: + throw new AssertionError(); + } + }); + } + public void testStartJoinRequestEqualsHashCodeSerialization() { StartJoinRequest initialStartJoinRequest = new StartJoinRequest(createNode(randomAlphaOfLength(10)), randomNonNegativeLong()); EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialStartJoinRequest, diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java new file mode 100644 index 0000000000000..19c7f436c4f8f --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java @@ -0,0 +1,430 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterState.VotingConfiguration; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.discovery.zen.PublishClusterStateActionTests.AssertingAckListener; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportResponse; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.function.LongSupplier; +import java.util.stream.Collector; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +public class PublicationTests extends ESTestCase { + + class MockNode { + + MockNode(Settings settings, DiscoveryNode localNode) { + this.localNode = localNode; + ClusterState initialState = CoordinationStateTests.clusterState(0L, 0L, localNode, + VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L); + coordinationState = new CoordinationState(settings, localNode, new CoordinationStateTests.InMemoryPersistedState(0L, + initialState)); + } + + final DiscoveryNode localNode; + + final CoordinationState coordinationState; + + public MockPublication publish(ClusterState clusterState, Discovery.AckListener ackListener, Set faultyNodes) { + PublishRequest publishRequest = coordinationState.handleClientValue(clusterState); + MockPublication currentPublication = new MockPublication(Settings.EMPTY, publishRequest, ackListener, () -> 0L) { + @Override + protected boolean isPublishQuorum(CoordinationState.VoteCollection votes) { + return coordinationState.isPublishQuorum(votes); + } + + @Override + protected Optional handlePublishResponse(DiscoveryNode sourceNode, PublishResponse publishResponse) { + return coordinationState.handlePublishResponse(sourceNode, publishResponse); + } + }; + currentPublication.start(faultyNodes); + return currentPublication; + } + } + + abstract class MockPublication extends Publication { + + final PublishRequest publishRequest; + + ApplyCommitRequest applyCommit; + + boolean completed; + + boolean committed; + + Map> pendingPublications = new HashMap<>(); + Map> pendingCommits = new HashMap<>(); + Map possibleJoins = new HashMap<>(); + + MockPublication(Settings settings, PublishRequest publishRequest, Discovery.AckListener ackListener, + LongSupplier currentTimeSupplier) { + super(settings, publishRequest, ackListener, currentTimeSupplier); + this.publishRequest = publishRequest; + } + + @Override + protected void onCompletion(boolean committed) { + assertFalse(completed); + completed = true; + this.committed = committed; + } + + @Override + protected void onPossibleJoin(DiscoveryNode sourceNode, PublishWithJoinResponse response) { + assertNull(possibleJoins.put(sourceNode, response)); + } + + @Override + protected void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest, + ActionListener responseActionListener) { + assertSame(publishRequest, this.publishRequest); + assertNull(pendingPublications.put(destination, responseActionListener)); + } + + @Override + protected void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommit, + ActionListener responseActionListener) { + if (this.applyCommit == null) { + this.applyCommit = applyCommit; + } else { + assertSame(applyCommit, this.applyCommit); + } + assertNull(pendingCommits.put(destination, responseActionListener)); + } + } + + DiscoveryNode n1 = CoordinationStateTests.createNode("node1"); + DiscoveryNode n2 = CoordinationStateTests.createNode("node2"); + DiscoveryNode n3 = CoordinationStateTests.createNode("node3"); + Set discoNodes = Sets.newHashSet(n1, n2, n3); + + MockNode node1 = new MockNode(Settings.EMPTY, n1); + MockNode node2 = new MockNode(Settings.EMPTY, n2); + MockNode node3 = new MockNode(Settings.EMPTY, n3); + List nodes = Arrays.asList(node1, node2, node3); + + Function nodeResolver = dn -> nodes.stream().filter(mn -> mn.localNode.equals(dn)).findFirst().get(); + + private void initializeCluster(VotingConfiguration initialConfig) { + node1.coordinationState.setInitialState(CoordinationStateTests.clusterState(0L, 1L, n1, initialConfig, initialConfig, 0L)); + StartJoinRequest startJoinRequest = new StartJoinRequest(n1, 1L); + node1.coordinationState.handleJoin(node1.coordinationState.handleStartJoin(startJoinRequest)); + node1.coordinationState.handleJoin(node2.coordinationState.handleStartJoin(startJoinRequest)); + node1.coordinationState.handleJoin(node3.coordinationState.handleStartJoin(startJoinRequest)); + assertTrue(node1.coordinationState.electionWon()); + } + + public void testSimpleClusterStatePublishing() throws InterruptedException { + VotingConfiguration singleNodeConfig = new VotingConfiguration(Sets.newHashSet(n1.getId())); + initializeCluster(singleNodeConfig); + + AssertingAckListener ackListener = new AssertingAckListener(nodes.size()); + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(n1).add(n2).add(n3).localNodeId(n1.getId()).build(); + MockPublication publication = node1.publish(CoordinationStateTests.clusterState(1L, 2L, + discoveryNodes, singleNodeConfig, singleNodeConfig, 42L), ackListener, Collections.emptySet()); + + assertThat(publication.pendingPublications.keySet(), equalTo(discoNodes)); + assertTrue(publication.pendingCommits.isEmpty()); + AtomicBoolean processedNode1PublishResponse = new AtomicBoolean(); + boolean delayProcessingNode2PublishResponse = randomBoolean(); + publication.pendingPublications.entrySet().stream().collect(shuffle()).forEach(e -> { + if (delayProcessingNode2PublishResponse && e.getKey().equals(n2)) { + return; + } + PublishResponse publishResponse = nodeResolver.apply(e.getKey()).coordinationState.handlePublishRequest( + publication.publishRequest); + assertNotEquals(processedNode1PublishResponse.get(), publication.pendingCommits.isEmpty()); + assertFalse(publication.possibleJoins.containsKey(e.getKey())); + PublishWithJoinResponse publishWithJoinResponse = new PublishWithJoinResponse(publishResponse, + randomBoolean() ? Optional.empty() : Optional.of(new Join(e.getKey(), randomFrom(n1, n2, n3), randomNonNegativeLong(), + randomNonNegativeLong(), randomNonNegativeLong()))); + e.getValue().onResponse(publishWithJoinResponse); + assertTrue(publication.possibleJoins.containsKey(e.getKey())); + assertEquals(publishWithJoinResponse, publication.possibleJoins.get(e.getKey())); + if (e.getKey().equals(n1)) { + processedNode1PublishResponse.set(true); + } + assertNotEquals(processedNode1PublishResponse.get(), publication.pendingCommits.isEmpty()); + }); + + if (delayProcessingNode2PublishResponse) { + assertThat(publication.pendingCommits.keySet(), equalTo(Sets.newHashSet(n1, n3))); + } else { + assertThat(publication.pendingCommits.keySet(), equalTo(discoNodes)); + } + assertNotNull(publication.applyCommit); + assertEquals(publication.applyCommit.getTerm(), publication.publishRequest.getAcceptedState().term()); + assertEquals(publication.applyCommit.getVersion(), publication.publishRequest.getAcceptedState().version()); + publication.pendingCommits.entrySet().stream().collect(shuffle()).forEach(e -> { + assertFalse(publication.completed); + assertFalse(publication.committed); + nodeResolver.apply(e.getKey()).coordinationState.handleCommit(publication.applyCommit); + e.getValue().onResponse(TransportResponse.Empty.INSTANCE); + }); + + if (delayProcessingNode2PublishResponse) { + assertFalse(publication.completed); + assertFalse(publication.committed); + PublishResponse publishResponse = nodeResolver.apply(n2).coordinationState.handlePublishRequest( + publication.publishRequest); + publication.pendingPublications.get(n2).onResponse(new PublishWithJoinResponse(publishResponse, Optional.empty())); + assertThat(publication.pendingCommits.keySet(), equalTo(discoNodes)); + + assertFalse(publication.completed); + assertFalse(publication.committed); + publication.pendingCommits.get(n2).onResponse(TransportResponse.Empty.INSTANCE); + } + + assertTrue(publication.completed); + assertTrue(publication.committed); + + assertThat(ackListener.await(0L, TimeUnit.SECONDS), containsInAnyOrder(n1, n2, n3)); + } + + public void testClusterStatePublishingWithFaultyNodeBeforeCommit() throws InterruptedException { + VotingConfiguration singleNodeConfig = new VotingConfiguration(Sets.newHashSet(n1.getId())); + initializeCluster(singleNodeConfig); + + AssertingAckListener ackListener = new AssertingAckListener(nodes.size()); + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(n1).add(n2).add(n3).localNodeId(n1.getId()).build(); + + AtomicInteger remainingActions = new AtomicInteger(4); // number of publish actions + initial faulty nodes injection + int injectFaultAt = randomInt(remainingActions.get() - 1); + logger.info("Injecting fault at: {}", injectFaultAt); + + Set initialFaultyNodes = remainingActions.decrementAndGet() == injectFaultAt ? + Collections.singleton(n2) : Collections.emptySet(); + MockPublication publication = node1.publish(CoordinationStateTests.clusterState(1L, 2L, + discoveryNodes, singleNodeConfig, singleNodeConfig, 42L), ackListener, initialFaultyNodes); + + publication.pendingPublications.entrySet().stream().collect(shuffle()).forEach(e -> { + if (remainingActions.decrementAndGet() == injectFaultAt) { + publication.onFaultyNode(n2); + } + if (e.getKey().equals(n2) == false || randomBoolean()) { + PublishResponse publishResponse = nodeResolver.apply(e.getKey()).coordinationState.handlePublishRequest( + publication.publishRequest); + e.getValue().onResponse(new PublishWithJoinResponse(publishResponse, Optional.empty())); + } + }); + + publication.pendingCommits.entrySet().stream().collect(shuffle()).forEach(e -> { + nodeResolver.apply(e.getKey()).coordinationState.handleCommit(publication.applyCommit); + e.getValue().onResponse(TransportResponse.Empty.INSTANCE); + }); + + assertTrue(publication.completed); + assertTrue(publication.committed); + + publication.onFaultyNode(randomFrom(n1, n3)); // has no influence + + List> errors = ackListener.awaitErrors(0L, TimeUnit.SECONDS); + assertThat(errors.size(), equalTo(1)); + assertThat(errors.get(0).v1(), equalTo(n2)); + assertThat(errors.get(0).v2().getMessage(), containsString("faulty node")); + } + + public void testClusterStatePublishingWithFaultyNodeAfterCommit() throws InterruptedException { + VotingConfiguration singleNodeConfig = new VotingConfiguration(Sets.newHashSet(n1.getId())); + initializeCluster(singleNodeConfig); + + AssertingAckListener ackListener = new AssertingAckListener(nodes.size()); + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(n1).add(n2).add(n3).localNodeId(n1.getId()).build(); + + boolean publicationDidNotMakeItToNode2 = randomBoolean(); + AtomicInteger remainingActions = new AtomicInteger(publicationDidNotMakeItToNode2 ? 2 : 3); + int injectFaultAt = randomInt(remainingActions.get() - 1); + logger.info("Injecting fault at: {}, publicationDidNotMakeItToNode2: {}", injectFaultAt, publicationDidNotMakeItToNode2); + + MockPublication publication = node1.publish(CoordinationStateTests.clusterState(1L, 2L, + discoveryNodes, singleNodeConfig, singleNodeConfig, 42L), ackListener, Collections.emptySet()); + + publication.pendingPublications.entrySet().stream().collect(shuffle()).forEach(e -> { + if (e.getKey().equals(n2) == false || publicationDidNotMakeItToNode2 == false) { + PublishResponse publishResponse = nodeResolver.apply(e.getKey()).coordinationState.handlePublishRequest( + publication.publishRequest); + e.getValue().onResponse(new PublishWithJoinResponse(publishResponse, Optional.empty())); + } + }); + + publication.pendingCommits.entrySet().stream().collect(shuffle()).forEach(e -> { + if (e.getKey().equals(n2)) { + // we must fail node before committing for the node, otherwise failing the node is ignored + publication.onFaultyNode(n2); + } + if (remainingActions.decrementAndGet() == injectFaultAt) { + publication.onFaultyNode(n2); + } + if (e.getKey().equals(n2) == false || randomBoolean()) { + nodeResolver.apply(e.getKey()).coordinationState.handleCommit(publication.applyCommit); + e.getValue().onResponse(TransportResponse.Empty.INSTANCE); + } + }); + + // we need to complete publication by failing the node + if (publicationDidNotMakeItToNode2 && remainingActions.get() > injectFaultAt) { + publication.onFaultyNode(n2); + } + + assertTrue(publication.completed); + assertTrue(publication.committed); + + publication.onFaultyNode(randomFrom(n1, n3)); // has no influence + + List> errors = ackListener.awaitErrors(0L, TimeUnit.SECONDS); + assertThat(errors.size(), equalTo(1)); + assertThat(errors.get(0).v1(), equalTo(n2)); + assertThat(errors.get(0).v2().getMessage(), containsString("faulty node")); + } + + public void testClusterStatePublishingFailsOrTimesOutBeforeCommit() throws InterruptedException { + VotingConfiguration config = new VotingConfiguration(Sets.newHashSet(n1.getId(), n2.getId())); + initializeCluster(config); + + AssertingAckListener ackListener = new AssertingAckListener(nodes.size()); + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(n1).add(n2).add(n3).localNodeId(n1.getId()).build(); + MockPublication publication = node1.publish(CoordinationStateTests.clusterState(1L, 2L, + discoveryNodes, config, config, 42L), ackListener, Collections.emptySet()); + + boolean timeOut = randomBoolean(); + publication.pendingPublications.entrySet().stream().collect(shuffle()).forEach(e -> { + if (e.getKey().equals(n2)) { + if (timeOut) { + publication.onTimeout(); + } else { + e.getValue().onFailure(new TransportException(new Exception("dummy failure"))); + } + assertTrue(publication.completed); + assertFalse(publication.committed); + } else if (randomBoolean()) { + PublishResponse publishResponse = nodeResolver.apply(e.getKey()).coordinationState.handlePublishRequest( + publication.publishRequest); + e.getValue().onResponse(new PublishWithJoinResponse(publishResponse, Optional.empty())); + } + }); + + assertThat(publication.pendingCommits.keySet(), equalTo(Collections.emptySet())); + assertNull(publication.applyCommit); + assertTrue(publication.completed); + assertFalse(publication.committed); + + List> errors = ackListener.awaitErrors(0L, TimeUnit.SECONDS); + assertThat(errors.size(), equalTo(3)); + assertThat(errors.stream().map(Tuple::v1).collect(Collectors.toList()), containsInAnyOrder(n1, n2, n3)); + errors.stream().forEach(tuple -> + assertThat(tuple.v2().getMessage(), containsString(timeOut ? "timed out" : + tuple.v1().equals(n2) ? "dummy failure" : "non-failed nodes do not form a quorum"))); + } + + public void testClusterStatePublishingTimesOutAfterCommit() throws InterruptedException { + VotingConfiguration config = new VotingConfiguration(randomBoolean() ? + Sets.newHashSet(n1.getId(), n2.getId()) : Sets.newHashSet(n1.getId(), n2.getId(), n3.getId())); + initializeCluster(config); + + AssertingAckListener ackListener = new AssertingAckListener(nodes.size()); + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(n1).add(n2).add(n3).localNodeId(n1.getId()).build(); + MockPublication publication = node1.publish(CoordinationStateTests.clusterState(1L, 2L, + discoveryNodes, config, config, 42L), ackListener, Collections.emptySet()); + + boolean publishedToN3 = randomBoolean(); + publication.pendingPublications.entrySet().stream().collect(shuffle()).forEach(e -> { + if (e.getKey().equals(n3) == false || publishedToN3) { + PublishResponse publishResponse = nodeResolver.apply(e.getKey()).coordinationState.handlePublishRequest( + publication.publishRequest); + e.getValue().onResponse(new PublishWithJoinResponse(publishResponse, Optional.empty())); + } + }); + + assertNotNull(publication.applyCommit); + + Set committingNodes = new HashSet<>(randomSubsetOf(discoNodes)); + if (publishedToN3 == false) { + committingNodes.remove(n3); + } + + logger.info("Committing nodes: {}", committingNodes); + + publication.pendingCommits.entrySet().stream().collect(shuffle()).forEach(e -> { + if (committingNodes.contains(e.getKey())) { + nodeResolver.apply(e.getKey()).coordinationState.handleCommit(publication.applyCommit); + e.getValue().onResponse(TransportResponse.Empty.INSTANCE); + } + }); + + publication.onTimeout(); + assertTrue(publication.completed); + assertTrue(publication.committed); + assertEquals(committingNodes, ackListener.await(0L, TimeUnit.SECONDS)); + + // check that acking still works after publication completed + if (publishedToN3 == false) { + publication.pendingPublications.get(n3).onResponse( + new PublishWithJoinResponse(node3.coordinationState.handlePublishRequest(publication.publishRequest), Optional.empty())); + } + + assertEquals(discoNodes, publication.pendingCommits.keySet()); + + Set nonCommittedNodes = Sets.difference(discoNodes, committingNodes); + logger.info("Non-committed nodes: {}", nonCommittedNodes); + nonCommittedNodes.stream().collect(shuffle()).forEach(n -> + publication.pendingCommits.get(n).onResponse(TransportResponse.Empty.INSTANCE)); + + assertEquals(discoNodes, ackListener.await(0L, TimeUnit.SECONDS)); + } + + public static Collector> shuffle() { + return Collectors.collectingAndThen(Collectors.toList(), + ts -> { + Collections.shuffle(ts, random()); + return ts.stream(); + }); + } + + +} diff --git a/server/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java b/server/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java index 0503a4f819d23..57bde7f70ccbc 100644 --- a/server/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java @@ -65,8 +65,10 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -816,6 +818,7 @@ public AssertingAckListener publishState(PublishClusterStateAction action, Clust public static class AssertingAckListener implements Discovery.AckListener { private final List> errors = new CopyOnWriteArrayList<>(); + private final Set successfulAcks = Collections.synchronizedSet(new HashSet<>()); private final CountDownLatch countDown; private final CountDownLatch commitCountDown; @@ -833,13 +836,16 @@ public void onCommit(TimeValue commitTime) { public void onNodeAck(DiscoveryNode node, @Nullable Exception e) { if (e != null) { errors.add(new Tuple<>(node, e)); + } else { + successfulAcks.add(node); } countDown.countDown(); } - public void await(long timeout, TimeUnit unit) throws InterruptedException { + public Set await(long timeout, TimeUnit unit) throws InterruptedException { assertThat(awaitErrors(timeout, unit), emptyIterable()); assertTrue(commitCountDown.await(timeout, unit)); + return new HashSet<>(successfulAcks); } public List> awaitErrors(long timeout, TimeUnit unit) throws InterruptedException {