diff --git a/server/src/main/java/org/elasticsearch/ElasticsearchException.java b/server/src/main/java/org/elasticsearch/ElasticsearchException.java index 9a02b76b3e038..01c24acccdd81 100644 --- a/server/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/server/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.coordination.CoordinationStateRejectedException; import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; @@ -1024,8 +1025,10 @@ private enum ElasticsearchExceptionHandle { UNKNOWN_NAMED_OBJECT_EXCEPTION(org.elasticsearch.common.xcontent.UnknownNamedObjectException.class, org.elasticsearch.common.xcontent.UnknownNamedObjectException::new, 148, Version.V_5_2_0), TOO_MANY_BUCKETS_EXCEPTION(MultiBucketConsumerService.TooManyBucketsException.class, - MultiBucketConsumerService.TooManyBucketsException::new, 149, - Version.V_7_0_0_alpha1); + MultiBucketConsumerService.TooManyBucketsException::new, 149, Version.V_7_0_0_alpha1), + COORDINATION_STATE_REJECTED_EXCEPTION(CoordinationStateRejectedException.class, + CoordinationStateRejectedException::new, 150, Version.V_7_0_0_alpha1); + final Class exceptionClass; final CheckedFunction constructor; diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/ApplyCommitRequest.java b/server/src/main/java/org/elasticsearch/cluster/coordination/ApplyCommitRequest.java new file mode 100644 index 0000000000000..02cffbb4c4d3c --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/ApplyCommitRequest.java @@ -0,0 +1,54 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * A master node sends this request to its peers to inform them that it could commit the + * cluster state with the given term and version. Peers that have accepted the given cluster + * state will then consider it as committed and proceed to apply the state locally. + */ +public class ApplyCommitRequest extends TermVersionRequest { + + public ApplyCommitRequest(DiscoveryNode sourceNode, long term, long version) { + super(sourceNode, term, version); + } + + public ApplyCommitRequest(StreamInput in) throws IOException { + super(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + } + + @Override + public String toString() { + return "ApplyCommitRequest{" + + "term=" + term + + ", version=" + version + + '}'; + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java new file mode 100644 index 0000000000000..afbf6363618a5 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java @@ -0,0 +1,485 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterState.VotingConfiguration; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.Settings; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** + * The core class of the cluster state coordination algorithm, directly implementing the + * formal model + */ +public class CoordinationState extends AbstractComponent { + + private final DiscoveryNode localNode; + + // persisted state + private final PersistedState persistedState; + + // transient state + private VoteCollection joinVotes; + private boolean startedJoinSinceLastReboot; + private boolean electionWon; + private long lastPublishedVersion; + private VotingConfiguration lastPublishedConfiguration; + private VoteCollection publishVotes; + + public CoordinationState(Settings settings, DiscoveryNode localNode, PersistedState persistedState) { + super(settings); + + this.localNode = localNode; + + // persisted state + this.persistedState = persistedState; + + // transient state + this.joinVotes = new VoteCollection(); + this.startedJoinSinceLastReboot = false; + this.electionWon = false; + this.lastPublishedVersion = 0L; + this.lastPublishedConfiguration = persistedState.getLastAcceptedState().getLastAcceptedConfiguration(); + this.publishVotes = new VoteCollection(); + } + + public long getCurrentTerm() { + return persistedState.getCurrentTerm(); + } + + public ClusterState getLastAcceptedState() { + return persistedState.getLastAcceptedState(); + } + + public long getLastAcceptedTerm() { + return getLastAcceptedState().term(); + } + + public long getLastAcceptedVersion() { + return getLastAcceptedState().version(); + } + + public VotingConfiguration getLastCommittedConfiguration() { + return getLastAcceptedState().getLastCommittedConfiguration(); + } + + public VotingConfiguration getLastAcceptedConfiguration() { + return getLastAcceptedState().getLastAcceptedConfiguration(); + } + + public long getLastPublishedVersion() { + return lastPublishedVersion; + } + + public boolean electionWon() { + return electionWon; + } + + public boolean isElectionQuorum(VoteCollection votes) { + return votes.isQuorum(getLastCommittedConfiguration()) && votes.isQuorum(getLastAcceptedConfiguration()); + } + + public boolean isPublishQuorum(VoteCollection votes) { + return votes.isQuorum(getLastCommittedConfiguration()) && votes.isQuorum(lastPublishedConfiguration); + } + + public boolean containsJoinVoteFor(DiscoveryNode node) { + return joinVotes.containsVoteFor(node); + } + + public boolean joinVotesHaveQuorumFor(VotingConfiguration votingConfiguration) { + return joinVotes.isQuorum(votingConfiguration); + } + + /** + * Used to bootstrap a cluster by injecting the initial state and configuration. + * + * @param initialState The initial state to use. Must have term 0, version 1, and non-empty configurations. + * @throws CoordinationStateRejectedException if the arguments were incompatible with the current state of this object. + */ + public void setInitialState(ClusterState initialState) { + final long lastAcceptedVersion = getLastAcceptedVersion(); + if (lastAcceptedVersion != 0) { + logger.debug("setInitialState: rejecting since last-accepted version {} > 0", lastAcceptedVersion); + throw new CoordinationStateRejectedException("initial state already set: last-accepted version now " + lastAcceptedVersion); + } + + assert getLastAcceptedTerm() == 0; + assert getLastAcceptedConfiguration().isEmpty(); + assert getLastCommittedConfiguration().isEmpty(); + assert lastPublishedVersion == 0; + assert lastPublishedConfiguration.isEmpty(); + assert electionWon == false; + assert joinVotes.isEmpty(); + assert publishVotes.isEmpty(); + + assert initialState.term() == 0; + assert initialState.version() == 1; + assert initialState.getLastAcceptedConfiguration().isEmpty() == false; + assert initialState.getLastCommittedConfiguration().isEmpty() == false; + + persistedState.setLastAcceptedState(initialState); + } + + /** + * May be safely called at any time to move this instance to a new term. + * + * @param startJoinRequest The startJoinRequest, specifying the node requesting the join. + * @return A Join that should be sent to the target node of the join. + * @throws CoordinationStateRejectedException if the arguments were incompatible with the current state of this object. + */ + public Join handleStartJoin(StartJoinRequest startJoinRequest) { + if (startJoinRequest.getTerm() <= getCurrentTerm()) { + logger.debug("handleStartJoin: ignored as term provided [{}] not greater than current term [{}]", + startJoinRequest.getTerm(), getCurrentTerm()); + throw new CoordinationStateRejectedException("incoming term " + startJoinRequest.getTerm() + + " not greater than current term " + getCurrentTerm()); + } + + logger.debug("handleStartJoin: updating term from [{}] to [{}]", getCurrentTerm(), startJoinRequest.getTerm()); + + persistedState.setCurrentTerm(startJoinRequest.getTerm()); + assert getCurrentTerm() == startJoinRequest.getTerm(); + lastPublishedVersion = 0; + lastPublishedConfiguration = getLastAcceptedConfiguration(); + startedJoinSinceLastReboot = true; + electionWon = false; + joinVotes = new VoteCollection(); + publishVotes = new VoteCollection(); + + return new Join(localNode, startJoinRequest.getSourceNode(), getCurrentTerm(), getLastAcceptedTerm(), getLastAcceptedVersion()); + } + + /** + * May be called on receipt of a Join. + * + * @param join The Join received. + * @return true iff this instance does not already have a join vote from the given source node for this term + * @throws CoordinationStateRejectedException if the arguments were incompatible with the current state of this object. + */ + public boolean handleJoin(Join join) { + assert join.getTargetNode().equals(localNode) : "handling join " + join + " for the wrong node " + localNode; + + if (join.getTerm() != getCurrentTerm()) { + logger.debug("handleJoin: ignored join due to term mismatch (expected: [{}], actual: [{}])", + getCurrentTerm(), join.getTerm()); + throw new CoordinationStateRejectedException( + "incoming term " + join.getTerm() + " does not match current term " + getCurrentTerm()); + } + + if (startedJoinSinceLastReboot == false) { + logger.debug("handleJoin: ignored join as term was not incremented yet after reboot"); + throw new CoordinationStateRejectedException("ignored join as term has not been incremented yet after reboot"); + } + + final long lastAcceptedTerm = getLastAcceptedTerm(); + if (join.getLastAcceptedTerm() > lastAcceptedTerm) { + logger.debug("handleJoin: ignored join as joiner has a better last accepted term (expected: <=[{}], actual: [{}])", + lastAcceptedTerm, join.getLastAcceptedTerm()); + throw new CoordinationStateRejectedException("incoming last accepted term " + join.getLastAcceptedTerm() + + " of join higher than current last accepted term " + lastAcceptedTerm); + } + + if (join.getLastAcceptedTerm() == lastAcceptedTerm && join.getLastAcceptedVersion() > getLastAcceptedVersion()) { + logger.debug("handleJoin: ignored join as joiner has a better last accepted version (expected: <=[{}], actual: [{}])", + getLastAcceptedVersion(), join.getLastAcceptedVersion()); + throw new CoordinationStateRejectedException("incoming last accepted version " + join.getLastAcceptedVersion() + + " of join higher than current last accepted version " + getLastAcceptedVersion()); + } + + if (getLastAcceptedVersion() == 0) { + // We do not check for an election won on setting the initial configuration, so it would be possible to end up in a state where + // we have enough join votes to have won the election immediately on setting the initial configuration. It'd be quite + // complicated to restore all the appropriate invariants when setting the initial configuration (it's not just electionWon) + // so instead we just reject join votes received prior to receiving the initial configuration. + logger.debug("handleJoin: ignored join because initial configuration not set"); + throw new CoordinationStateRejectedException("initial configuration not set"); + } + + boolean added = joinVotes.addVote(join.getSourceNode()); + boolean prevElectionWon = electionWon; + electionWon = isElectionQuorum(joinVotes); + logger.debug("handleJoin: added join {} from [{}] for election, electionWon={} lastAcceptedTerm={} lastAcceptedVersion={}", join, + join.getSourceNode(), electionWon, lastAcceptedTerm, getLastAcceptedVersion()); + + if (electionWon && prevElectionWon == false) { + lastPublishedVersion = getLastAcceptedVersion(); + } + return added; + } + + /** + * May be called in order to prepare publication of the given cluster state + * + * @param clusterState The cluster state to publish. + * @return A PublishRequest to publish the given cluster state + * @throws CoordinationStateRejectedException if the arguments were incompatible with the current state of this object. + */ + public PublishRequest handleClientValue(ClusterState clusterState) { + if (electionWon == false) { + logger.debug("handleClientValue: ignored request as election not won"); + throw new CoordinationStateRejectedException("election not won"); + } + if (lastPublishedVersion != getLastAcceptedVersion()) { + logger.debug("handleClientValue: cannot start publishing next value before accepting previous one"); + throw new CoordinationStateRejectedException("cannot start publishing next value before accepting previous one"); + } + if (clusterState.term() != getCurrentTerm()) { + logger.debug("handleClientValue: ignored request due to term mismatch " + + "(expected: [term {} version >{}], actual: [term {} version {}])", + getCurrentTerm(), lastPublishedVersion, clusterState.term(), clusterState.version()); + throw new CoordinationStateRejectedException("incoming term " + clusterState.term() + " does not match current term " + + getCurrentTerm()); + } + if (clusterState.version() <= lastPublishedVersion) { + logger.debug("handleClientValue: ignored request due to version mismatch " + + "(expected: [term {} version >{}], actual: [term {} version {}])", + getCurrentTerm(), lastPublishedVersion, clusterState.term(), clusterState.version()); + throw new CoordinationStateRejectedException("incoming cluster state version " + clusterState.version() + + " lower or equal to last published version " + lastPublishedVersion); + } + + if (clusterState.getLastAcceptedConfiguration().equals(getLastAcceptedConfiguration()) == false + && getLastCommittedConfiguration().equals(getLastAcceptedConfiguration()) == false) { + logger.debug("handleClientValue: only allow reconfiguration while not already reconfiguring"); + throw new CoordinationStateRejectedException("only allow reconfiguration while not already reconfiguring"); + } + if (joinVotesHaveQuorumFor(clusterState.getLastAcceptedConfiguration()) == false) { + logger.debug("handleClientValue: only allow reconfiguration if joinVotes have quorum for new config"); + throw new CoordinationStateRejectedException("only allow reconfiguration if joinVotes have quorum for new config"); + } + + assert clusterState.getLastCommittedConfiguration().equals(getLastCommittedConfiguration()) : + "last committed configuration should not change"; + + lastPublishedVersion = clusterState.version(); + lastPublishedConfiguration = clusterState.getLastAcceptedConfiguration(); + publishVotes = new VoteCollection(); + + logger.trace("handleClientValue: processing request for version [{}] and term [{}]", lastPublishedVersion, getCurrentTerm()); + + return new PublishRequest(clusterState); + } + + /** + * May be called on receipt of a PublishRequest. + * + * @param publishRequest The publish request received. + * @return A PublishResponse which can be sent back to the sender of the PublishRequest. + * @throws CoordinationStateRejectedException if the arguments were incompatible with the current state of this object. + */ + public PublishResponse handlePublishRequest(PublishRequest publishRequest) { + final ClusterState clusterState = publishRequest.getAcceptedState(); + if (clusterState.term() != getCurrentTerm()) { + logger.debug("handlePublishRequest: ignored publish request due to term mismatch (expected: [{}], actual: [{}])", + getCurrentTerm(), clusterState.term()); + throw new CoordinationStateRejectedException("incoming term " + clusterState.term() + " does not match current term " + + getCurrentTerm()); + } + if (clusterState.term() == getLastAcceptedTerm() && clusterState.version() <= getLastAcceptedVersion()) { + logger.debug("handlePublishRequest: ignored publish request due to version mismatch (expected: >[{}], actual: [{}])", + getLastAcceptedVersion(), clusterState.version()); + throw new CoordinationStateRejectedException("incoming version " + clusterState.version() + + " lower or equal to current version " + getLastAcceptedVersion()); + } + + logger.trace("handlePublishRequest: accepting publish request for version [{}] and term [{}]", + clusterState.version(), clusterState.term()); + persistedState.setLastAcceptedState(clusterState); + assert getLastAcceptedState() == clusterState; + + return new PublishResponse(clusterState.term(), clusterState.version()); + } + + /** + * May be called on receipt of a PublishResponse from the given sourceNode. + * + * @param sourceNode The sender of the PublishResponse received. + * @param publishResponse The PublishResponse received. + * @return An optional ApplyCommitRequest which, if present, may be broadcast to all peers, indicating that this publication + * has been accepted at a quorum of peers and is therefore committed. + * @throws CoordinationStateRejectedException if the arguments were incompatible with the current state of this object. + */ + public Optional handlePublishResponse(DiscoveryNode sourceNode, PublishResponse publishResponse) { + if (electionWon == false) { + logger.debug("handlePublishResponse: ignored response as election not won"); + throw new CoordinationStateRejectedException("election not won"); + } + if (publishResponse.getTerm() != getCurrentTerm()) { + logger.debug("handlePublishResponse: ignored publish response due to term mismatch (expected: [{}], actual: [{}])", + getCurrentTerm(), publishResponse.getTerm()); + throw new CoordinationStateRejectedException("incoming term " + publishResponse.getTerm() + + " does not match current term " + getCurrentTerm()); + } + if (publishResponse.getVersion() != lastPublishedVersion) { + logger.debug("handlePublishResponse: ignored publish response due to version mismatch (expected: [{}], actual: [{}])", + lastPublishedVersion, publishResponse.getVersion()); + throw new CoordinationStateRejectedException("incoming version " + publishResponse.getVersion() + + " does not match current version " + lastPublishedVersion); + } + + logger.trace("handlePublishResponse: accepted publish response for version [{}] and term [{}] from [{}]", + publishResponse.getVersion(), publishResponse.getTerm(), sourceNode); + publishVotes.addVote(sourceNode); + if (isPublishQuorum(publishVotes)) { + logger.trace("handlePublishResponse: value committed for version [{}] and term [{}]", + publishResponse.getVersion(), publishResponse.getTerm()); + return Optional.of(new ApplyCommitRequest(localNode, publishResponse.getTerm(), publishResponse.getVersion())); + } + + return Optional.empty(); + } + + /** + * May be called on receipt of an ApplyCommitRequest. Updates the committed configuration accordingly. + * + * @param applyCommit The ApplyCommitRequest received. + * @throws CoordinationStateRejectedException if the arguments were incompatible with the current state of this object. + */ + public void handleCommit(ApplyCommitRequest applyCommit) { + if (applyCommit.getTerm() != getCurrentTerm()) { + logger.debug("handleCommit: ignored commit request due to term mismatch " + + "(expected: [term {} version {}], actual: [term {} version {}])", + getLastAcceptedTerm(), getLastAcceptedVersion(), applyCommit.getTerm(), applyCommit.getVersion()); + throw new CoordinationStateRejectedException("incoming term " + applyCommit.getTerm() + " does not match current term " + + getCurrentTerm()); + } + if (applyCommit.getTerm() != getLastAcceptedTerm()) { + logger.debug("handleCommit: ignored commit request due to term mismatch " + + "(expected: [term {} version {}], actual: [term {} version {}])", + getLastAcceptedTerm(), getLastAcceptedVersion(), applyCommit.getTerm(), applyCommit.getVersion()); + throw new CoordinationStateRejectedException("incoming term " + applyCommit.getTerm() + " does not match last accepted term " + + getLastAcceptedTerm()); + } + if (applyCommit.getVersion() != getLastAcceptedVersion()) { + logger.debug("handleCommit: ignored commit request due to version mismatch (term {}, expected: [{}], actual: [{}])", + getLastAcceptedTerm(), getLastAcceptedVersion(), applyCommit.getVersion()); + throw new CoordinationStateRejectedException("incoming version " + applyCommit.getVersion() + + " does not match current version " + getLastAcceptedVersion()); + } + + logger.trace("handleCommit: applying commit request for term [{}] and version [{}]", applyCommit.getTerm(), + applyCommit.getVersion()); + + persistedState.markLastAcceptedConfigAsCommitted(); + assert getLastCommittedConfiguration().equals(getLastAcceptedConfiguration()); + } + + /** + * Pluggable persistence layer for {@link CoordinationState}. + * + */ + public interface PersistedState { + + /** + * Returns the current term + */ + long getCurrentTerm(); + + /** + * Returns the last accepted cluster state + */ + ClusterState getLastAcceptedState(); + + /** + * Sets a new current term. + * After a successful call to this method, {@link #getCurrentTerm()} should return the last term that was set. + * The value returned by {@link #getLastAcceptedState()} should not be influenced by calls to this method. + */ + void setCurrentTerm(long currentTerm); + + /** + * Sets a new last accepted cluster state. + * After a successful call to this method, {@link #getLastAcceptedState()} should return the last cluster state that was set. + * The value returned by {@link #getCurrentTerm()} should not be influenced by calls to this method. + */ + void setLastAcceptedState(ClusterState clusterState); + + /** + * Marks the last accepted cluster state as committed. + * After a successful call to this method, {@link #getLastAcceptedState()} should return the last cluster state that was set, + * with the last committed configuration now corresponding to the last accepted configuration. + */ + default void markLastAcceptedConfigAsCommitted() { + final ClusterState lastAcceptedState = getLastAcceptedState(); + setLastAcceptedState(ClusterState.builder(lastAcceptedState) + .lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration()) + .build()); + } + } + + /** + * A collection of votes, used to calculate quorums. + */ + public static class VoteCollection { + + private final Map nodes; + + public boolean addVote(DiscoveryNode sourceNode) { + return nodes.put(sourceNode.getId(), sourceNode) == null; + } + + public VoteCollection() { + nodes = new HashMap<>(); + } + + public boolean isQuorum(VotingConfiguration configuration) { + return configuration.hasQuorum(nodes.keySet()); + } + + public boolean containsVoteFor(DiscoveryNode node) { + return nodes.containsKey(node.getId()); + } + + public boolean isEmpty() { + return nodes.isEmpty(); + } + + public Collection nodes() { + return Collections.unmodifiableCollection(nodes.values()); + } + + @Override + public String toString() { + return "VoteCollection{" + String.join(",", nodes.keySet()) + "}"; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + VoteCollection that = (VoteCollection) o; + + return nodes.equals(that.nodes); + } + + @Override + public int hashCode() { + return nodes.hashCode(); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationStateRejectedException.java b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationStateRejectedException.java new file mode 100644 index 0000000000000..d9f420eacd7ef --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationStateRejectedException.java @@ -0,0 +1,42 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.io.stream.StreamInput; + +import java.io.IOException; + +/** + * This exception is thrown when rejecting state transitions on the {@link CoordinationState} object, + * for example when receiving a publish request with the wrong term or version. + * Occurrences of this exception don't always signal failures, but can often be just caused by the + * asynchronous, distributed nature of the system. They will, for example, naturally happen during + * leader election, if multiple nodes are trying to become leader at the same time. + */ +public class CoordinationStateRejectedException extends ElasticsearchException { + public CoordinationStateRejectedException(String msg, Object... args) { + super(msg, args); + } + + public CoordinationStateRejectedException(StreamInput in) throws IOException { + super(in); + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Join.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Join.java new file mode 100644 index 0000000000000..1c69c4a62b80a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Join.java @@ -0,0 +1,127 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; + +import java.io.IOException; + +/** + * Triggered by a {@link StartJoinRequest}, instances of this class represent join votes, + * and have a source and target node. The source node is the node that provides the vote, + * and the target node is the node for which this vote is cast. A node will only cast + * a single vote per term, and this for a unique target node. The vote also carries + * information about the current state of the node that provided the vote, so that + * the receiver of the vote can determine if it has a more up-to-date state than the + * source node. + */ +public class Join implements Writeable { + private final DiscoveryNode sourceNode; + private final DiscoveryNode targetNode; + private final long term; + private final long lastAcceptedTerm; + private final long lastAcceptedVersion; + + public Join(DiscoveryNode sourceNode, DiscoveryNode targetNode, long term, long lastAcceptedTerm, long lastAcceptedVersion) { + assert term >= 0; + assert lastAcceptedTerm >= 0; + assert lastAcceptedVersion >= 0; + + this.sourceNode = sourceNode; + this.targetNode = targetNode; + this.term = term; + this.lastAcceptedTerm = lastAcceptedTerm; + this.lastAcceptedVersion = lastAcceptedVersion; + } + + public Join(StreamInput in) throws IOException { + sourceNode = new DiscoveryNode(in); + targetNode = new DiscoveryNode(in); + term = in.readLong(); + lastAcceptedTerm = in.readLong(); + lastAcceptedVersion = in.readLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + sourceNode.writeTo(out); + targetNode.writeTo(out); + out.writeLong(term); + out.writeLong(lastAcceptedTerm); + out.writeLong(lastAcceptedVersion); + } + + public DiscoveryNode getSourceNode() { + return sourceNode; + } + + public DiscoveryNode getTargetNode() { + return targetNode; + } + + public long getLastAcceptedVersion() { + return lastAcceptedVersion; + } + + public long getTerm() { + return term; + } + + public long getLastAcceptedTerm() { + return lastAcceptedTerm; + } + + @Override + public String toString() { + return "Join{" + + "term=" + term + + ", lastAcceptedTerm=" + lastAcceptedTerm + + ", lastAcceptedVersion=" + lastAcceptedVersion + + ", sourceNode=" + sourceNode + + ", targetNode=" + targetNode + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Join join = (Join) o; + + if (sourceNode.equals(join.sourceNode) == false) return false; + if (targetNode.equals(join.targetNode) == false) return false; + if (lastAcceptedVersion != join.lastAcceptedVersion) return false; + if (term != join.term) return false; + return lastAcceptedTerm == join.lastAcceptedTerm; + } + + @Override + public int hashCode() { + int result = (int) (lastAcceptedVersion ^ (lastAcceptedVersion >>> 32)); + result = 31 * result + sourceNode.hashCode(); + result = 31 * result + targetNode.hashCode(); + result = 31 * result + (int) (term ^ (term >>> 32)); + result = 31 * result + (int) (lastAcceptedTerm ^ (lastAcceptedTerm >>> 32)); + return result; + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PublishRequest.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PublishRequest.java new file mode 100644 index 0000000000000..be4ca3d081f8d --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PublishRequest.java @@ -0,0 +1,78 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.transport.TransportRequest; + +import java.io.IOException; +import java.util.Objects; + +/** + * Request which is used by the master node to publish cluster state changes. + */ +public class PublishRequest extends TransportRequest { + + private final ClusterState acceptedState; + + public PublishRequest(ClusterState acceptedState) { + this.acceptedState = acceptedState; + } + + public PublishRequest(StreamInput in, DiscoveryNode localNode) throws IOException { + super(in); + acceptedState = ClusterState.readFrom(in, localNode); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + acceptedState.writeTo(out); + } + + public ClusterState getAcceptedState() { + return acceptedState; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof PublishRequest)) return false; + + PublishRequest that = (PublishRequest) o; + + return acceptedState.term() == that.acceptedState.term() && + acceptedState.version() == that.acceptedState.version(); + } + + @Override + public int hashCode() { + return Objects.hash(acceptedState.term(), acceptedState.version()); + } + + @Override + public String toString() { + return "PublishRequest{term=" + acceptedState.term() + + ", version=" + acceptedState.version() + + ", state=" + acceptedState + '}'; + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PublishResponse.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PublishResponse.java new file mode 100644 index 0000000000000..f213657ed2e7a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PublishResponse.java @@ -0,0 +1,51 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * Response to a {@link PublishRequest}, carrying the term and version of the request. + */ +public class PublishResponse extends TermVersionResponse { + + public PublishResponse(long term, long version) { + super(term, version); + } + + public PublishResponse(StreamInput in) throws IOException { + super(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + } + + @Override + public String toString() { + return "PublishResponse{" + + "term=" + term + + ", version=" + version + + '}'; + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/StartJoinRequest.java b/server/src/main/java/org/elasticsearch/cluster/coordination/StartJoinRequest.java new file mode 100644 index 0000000000000..bac6c7798ed7e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/StartJoinRequest.java @@ -0,0 +1,88 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.transport.TransportRequest; + +import java.io.IOException; + +/** + * Represents the action of requesting a join vote (see {@link Join}) from a node. + * The source node represents the node that is asking for join votes. + */ +public class StartJoinRequest extends TransportRequest { + + private final DiscoveryNode sourceNode; + + private final long term; + + public StartJoinRequest(DiscoveryNode sourceNode, long term) { + this.sourceNode = sourceNode; + this.term = term; + } + + public StartJoinRequest(StreamInput input) throws IOException { + super(input); + this.sourceNode = new DiscoveryNode(input); + this.term = input.readLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + sourceNode.writeTo(out); + out.writeLong(term); + } + + public DiscoveryNode getSourceNode() { + return sourceNode; + } + + public long getTerm() { + return term; + } + + @Override + public String toString() { + return "StartJoinRequest{" + + "term=" + term + + ",node=" + sourceNode + "}"; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof StartJoinRequest)) return false; + + StartJoinRequest that = (StartJoinRequest) o; + + if (term != that.term) return false; + return sourceNode.equals(that.sourceNode); + } + + @Override + public int hashCode() { + int result = sourceNode.hashCode(); + result = 31 * result + (int) (term ^ (term >>> 32)); + return result; + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/TermVersionRequest.java b/server/src/main/java/org/elasticsearch/cluster/coordination/TermVersionRequest.java new file mode 100644 index 0000000000000..e591803e167a1 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/TermVersionRequest.java @@ -0,0 +1,98 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.transport.TransportRequest; + +import java.io.IOException; + +abstract class TermVersionRequest extends TransportRequest implements Writeable { + protected final DiscoveryNode sourceNode; + protected final long term; + protected final long version; + + TermVersionRequest(DiscoveryNode sourceNode, long term, long version) { + assert term >= 0; + assert version >= 0; + + this.sourceNode = sourceNode; + this.term = term; + this.version = version; + } + + TermVersionRequest(StreamInput in) throws IOException { + super(in); + sourceNode = new DiscoveryNode(in); + term = in.readLong(); + version = in.readLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + sourceNode.writeTo(out); + out.writeLong(term); + out.writeLong(version); + } + + public DiscoveryNode getSourceNode() { + return sourceNode; + } + + public long getTerm() { + return term; + } + + public long getVersion() { + return version; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TermVersionRequest versionTerm = (TermVersionRequest) o; + + if (term != versionTerm.term) return false; + if (version != versionTerm.version) return false; + return sourceNode.equals(versionTerm.sourceNode); + } + + @Override + public int hashCode() { + int result = (int) (term ^ (term >>> 32)); + result = 31 * result + (int) (version ^ (version >>> 32)); + result = 31 * result + sourceNode.hashCode(); + return result; + } + + @Override + public String toString() { + return "TermVersionRequest{" + + "term=" + term + + ", version=" + version + + ", sourceNode=" + sourceNode + + '}'; + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/TermVersionResponse.java b/server/src/main/java/org/elasticsearch/cluster/coordination/TermVersionResponse.java new file mode 100644 index 0000000000000..5eba2e6b732a5 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/TermVersionResponse.java @@ -0,0 +1,82 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.transport.TransportResponse; + +import java.io.IOException; + +abstract class TermVersionResponse extends TransportResponse { + protected final long term; + protected final long version; + + TermVersionResponse(long term, long version) { + assert term >= 0; + assert version >= 0; + + this.term = term; + this.version = version; + } + + TermVersionResponse(StreamInput in) throws IOException { + this(in.readLong(), in.readLong()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(term); + out.writeLong(version); + } + + public long getTerm() { + return term; + } + + public long getVersion() { + return version; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TermVersionResponse response = (TermVersionResponse) o; + + if (term != response.term) return false; + return version == response.version; + } + + @Override + public int hashCode() { + int result = (int) (term ^ (term >>> 32)); + result = 31 * result + (int) (version ^ (version >>> 32)); + return result; + } + + @Override + public String toString() { + return "TermVersionResponse{" + + "term=" + term + + ", version=" + version + + '}'; + } +} diff --git a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java index 1f62eb706a84b..091c5846748ba 100644 --- a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.client.AbstractClientHeadersTestCase; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.coordination.CoordinationStateRejectedException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IllegalShardRoutingStateException; import org.elasticsearch.cluster.routing.ShardRouting; @@ -810,6 +811,7 @@ public void testIds() { ids.put(147, org.elasticsearch.env.ShardLockObtainFailedException.class); ids.put(148, UnknownNamedObjectException.class); ids.put(149, MultiBucketConsumerService.TooManyBucketsException.class); + ids.put(150, CoordinationStateRejectedException.class); Map, Integer> reverse = new HashMap<>(); for (Map.Entry> entry : ids.entrySet()) { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java new file mode 100644 index 0000000000000..e7f61efa69054 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java @@ -0,0 +1,830 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.Assertions; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterState.VotingConfiguration; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.node.Node; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.EqualsHashCodeTestUtils; +import org.junit.Before; + +import java.util.Collections; +import java.util.Optional; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +public class CoordinationStateTests extends ESTestCase { + + private DiscoveryNode node1; + private DiscoveryNode node2; + private DiscoveryNode node3; + + private ClusterState initialStateNode1; + + private PersistedState ps1; + + private CoordinationState cs1; + private CoordinationState cs2; + private CoordinationState cs3; + + @Before + public void setupNodes() { + node1 = createNode("node1"); + node2 = createNode("node2"); + node3 = createNode("node3"); + + initialStateNode1 = clusterState(0L, 0L, node1, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 42L); + ClusterState initialStateNode2 = + clusterState(0L, 0L, node2, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 42L); + ClusterState initialStateNode3 = + clusterState(0L, 0L, node3, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 42L); + + ps1 = new InMemoryPersistedState(0L, initialStateNode1); + + cs1 = createCoordinationState(ps1, node1); + cs2 = createCoordinationState(new InMemoryPersistedState(0L, initialStateNode2), node2); + cs3 = createCoordinationState(new InMemoryPersistedState(0L, initialStateNode3), node3); + } + + private DiscoveryNode createNode(String id) { + return new DiscoveryNode(id, buildNewFakeTransportAddress(), Version.CURRENT); + } + + public void testSetInitialState() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + assertTrue(state1.getLastAcceptedConfiguration().hasQuorum(Collections.singleton(node1.getId()))); + assertTrue(state1.getLastCommittedConfiguration().hasQuorum(Collections.singleton(node1.getId()))); + cs1.setInitialState(state1); + assertThat(cs1.getLastAcceptedState(), equalTo(state1)); + } + + public void testSetInitialStateWhenAlreadySet() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + assertTrue(state1.getLastAcceptedConfiguration().hasQuorum(Collections.singleton(node1.getId()))); + assertTrue(state1.getLastCommittedConfiguration().hasQuorum(Collections.singleton(node1.getId()))); + cs1.setInitialState(state1); + assertThat(expectThrows(CoordinationStateRejectedException.class, () -> cs1.setInitialState(state1)).getMessage(), + containsString("initial state already set")); + } + + public void testStartJoinBeforeBootstrap() { + assertThat(cs1.getCurrentTerm(), equalTo(0L)); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(randomFrom(node1, node2), randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertThat(v1.getTargetNode(), equalTo(startJoinRequest1.getSourceNode())); + assertThat(v1.getSourceNode(), equalTo(node1)); + assertThat(v1.getTerm(), equalTo(startJoinRequest1.getTerm())); + assertThat(v1.getLastAcceptedTerm(), equalTo(initialStateNode1.term())); + assertThat(v1.getLastAcceptedVersion(), equalTo(initialStateNode1.version())); + assertThat(cs1.getCurrentTerm(), equalTo(startJoinRequest1.getTerm())); + + StartJoinRequest startJoinRequest2 = new StartJoinRequest(randomFrom(node1, node2), + randomLongBetween(0, startJoinRequest1.getTerm())); + expectThrows(CoordinationStateRejectedException.class, () -> cs1.handleStartJoin(startJoinRequest2)); + } + + public void testStartJoinAfterBootstrap() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + assertTrue(state1.getLastAcceptedConfiguration().hasQuorum(Collections.singleton(node1.getId()))); + assertTrue(state1.getLastCommittedConfiguration().hasQuorum(Collections.singleton(node1.getId()))); + cs1.setInitialState(state1); + + StartJoinRequest startJoinRequest1 = new StartJoinRequest(randomFrom(node1, node2), randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertThat(v1.getTargetNode(), equalTo(startJoinRequest1.getSourceNode())); + assertThat(v1.getSourceNode(), equalTo(node1)); + assertThat(v1.getTerm(), equalTo(startJoinRequest1.getTerm())); + assertThat(v1.getLastAcceptedTerm(), equalTo(state1.term())); + assertThat(v1.getLastAcceptedVersion(), equalTo(state1.version())); + assertThat(cs1.getCurrentTerm(), equalTo(startJoinRequest1.getTerm())); + + StartJoinRequest startJoinRequest2 = new StartJoinRequest(randomFrom(node1, node2), + randomLongBetween(0, startJoinRequest1.getTerm())); + assertThat(expectThrows(CoordinationStateRejectedException.class, () -> cs1.handleStartJoin(startJoinRequest2)).getMessage(), + containsString("not greater than current term")); + StartJoinRequest startJoinRequest3 = new StartJoinRequest(randomFrom(node1, node2), + startJoinRequest1.getTerm()); + assertThat(expectThrows(CoordinationStateRejectedException.class, () -> cs1.handleStartJoin(startJoinRequest3)).getMessage(), + containsString("not greater than current term")); + } + + public void testJoinBeforeBootstrap() { + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertThat(expectThrows(CoordinationStateRejectedException.class, () -> cs1.handleJoin(v1)).getMessage(), + containsString("initial configuration not set")); + } + + public void testJoinWithNoStartJoinAfterReboot() { + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + cs1 = createCoordinationState(ps1, node1); + assertThat(expectThrows(CoordinationStateRejectedException.class, () -> cs1.handleJoin(v1)).getMessage(), + containsString("ignored join as term has not been incremented yet after reboot")); + } + + public void testJoinWithWrongTarget() { + assumeTrue("test only works with assertions enabled", Assertions.ENABLED); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node2, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertThat(expectThrows(AssertionError.class, () -> cs1.handleJoin(v1)).getMessage(), + containsString("wrong node")); + } + + public void testJoinWithBadCurrentTerm() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node2, randomLongBetween(1, 5)); + cs1.handleStartJoin(startJoinRequest1); + Join badJoin = new Join(randomFrom(node1, node2), node1, randomLongBetween(0, startJoinRequest1.getTerm() - 1), + randomNonNegativeLong(), randomNonNegativeLong()); + assertThat(expectThrows(CoordinationStateRejectedException.class, () -> cs1.handleJoin(badJoin)).getMessage(), + containsString("does not match current term")); + } + + public void testJoinWithHigherAcceptedTerm() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node2, randomLongBetween(1, 5)); + cs1.handleStartJoin(startJoinRequest1); + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), randomLongBetween(2, 20), node1, initialConfig, initialConfig, 42L); + cs1.handlePublishRequest(new PublishRequest(state2)); + StartJoinRequest startJoinRequest2 = new StartJoinRequest(node2, randomLongBetween(startJoinRequest1.getTerm() + 1, 10)); + Join v1 = cs1.handleStartJoin(startJoinRequest2); + + Join badJoin = new Join(randomFrom(node1, node2), node1, v1.getTerm(), randomLongBetween(state2.term() + 1, 30), + randomNonNegativeLong()); + assertThat(expectThrows(CoordinationStateRejectedException.class, () -> cs1.handleJoin(badJoin)).getMessage(), + containsString("higher than current last accepted term")); + } + + public void testJoinWithSameAcceptedTermButHigherVersion() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node2, randomLongBetween(1, 5)); + cs1.handleStartJoin(startJoinRequest1); + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), randomLongBetween(2, 20), node1, initialConfig, initialConfig, 42L); + cs1.handlePublishRequest(new PublishRequest(state2)); + StartJoinRequest startJoinRequest2 = new StartJoinRequest(node2, randomLongBetween(startJoinRequest1.getTerm() + 1, 10)); + Join v1 = cs1.handleStartJoin(startJoinRequest2); + + Join badJoin = new Join(randomFrom(node1, node2), node1, v1.getTerm(), state2.term(), + randomLongBetween(state2.version() + 1, 30)); + assertThat(expectThrows(CoordinationStateRejectedException.class, () -> cs1.handleJoin(badJoin)).getMessage(), + containsString("higher than current last accepted version")); + } + + public void testJoinWithLowerLastAcceptedTermWinsElection() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node2, randomLongBetween(1, 5)); + cs1.handleStartJoin(startJoinRequest1); + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), randomLongBetween(2, 20), node1, initialConfig, initialConfig, 42L); + cs1.handlePublishRequest(new PublishRequest(state2)); + StartJoinRequest startJoinRequest2 = new StartJoinRequest(node2, randomLongBetween(startJoinRequest1.getTerm() + 1, 10)); + Join v1 = cs1.handleStartJoin(startJoinRequest2); + + Join join = new Join(node1, node1, v1.getTerm(), randomLongBetween(0, state2.term() - 1), randomLongBetween(0, 20)); + assertTrue(cs1.handleJoin(join)); + assertTrue(cs1.electionWon()); + assertTrue(cs1.containsJoinVoteFor(node1)); + assertFalse(cs1.containsJoinVoteFor(node2)); + assertEquals(cs1.getLastPublishedVersion(), cs1.getLastAcceptedVersion()); + assertFalse(cs1.handleJoin(join)); + } + + public void testJoinWithSameLastAcceptedTermButLowerOrSameVersionWinsElection() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node2, randomLongBetween(1, 5)); + cs1.handleStartJoin(startJoinRequest1); + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), randomLongBetween(2, 20), node1, initialConfig, initialConfig, 42L); + cs1.handlePublishRequest(new PublishRequest(state2)); + StartJoinRequest startJoinRequest2 = new StartJoinRequest(node2, randomLongBetween(startJoinRequest1.getTerm() + 1, 10)); + Join v1 = cs1.handleStartJoin(startJoinRequest2); + + Join join = new Join(node1, node1, v1.getTerm(), state2.term(), randomLongBetween(0, state2.version())); + assertTrue(cs1.handleJoin(join)); + assertTrue(cs1.electionWon()); + assertTrue(cs1.containsJoinVoteFor(node1)); + assertFalse(cs1.containsJoinVoteFor(node2)); + assertEquals(cs1.getLastPublishedVersion(), cs1.getLastAcceptedVersion()); + assertFalse(cs1.handleJoin(join)); + } + + public void testJoinDoesNotWinElection() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node2, randomLongBetween(1, 5)); + cs1.handleStartJoin(startJoinRequest1); + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), randomLongBetween(2, 20), node1, initialConfig, initialConfig, 42L); + cs1.handlePublishRequest(new PublishRequest(state2)); + StartJoinRequest startJoinRequest2 = new StartJoinRequest(node2, randomLongBetween(startJoinRequest1.getTerm() + 1, 10)); + Join v1 = cs1.handleStartJoin(startJoinRequest2); + + Join join = new Join(node2, node1, v1.getTerm(), randomLongBetween(0, state2.term()), randomLongBetween(0, state2.version())); + assertTrue(cs1.handleJoin(join)); + assertFalse(cs1.electionWon()); + assertEquals(cs1.getLastPublishedVersion(), 0L); + assertFalse(cs1.handleJoin(join)); + } + + public void testJoinDoesNotWinElectionWhenOnlyCommittedConfigQuorum() { + VotingConfiguration configNode1 = new VotingConfiguration(Collections.singleton(node1.getId())); + VotingConfiguration configNode2 = new VotingConfiguration(Collections.singleton(node2.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, configNode1, configNode2, 42L); + cs1.setInitialState(state1); + + StartJoinRequest startJoinRequest = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join join = cs1.handleStartJoin(startJoinRequest); + assertTrue(cs1.handleJoin(join)); + assertFalse(cs1.electionWon()); + assertEquals(cs1.getLastPublishedVersion(), 0L); + assertFalse(cs1.handleJoin(join)); + } + + public void testJoinDoesNotWinElectionWhenOnlyLastAcceptedConfigQuorum() { + VotingConfiguration configNode1 = new VotingConfiguration(Collections.singleton(node1.getId())); + VotingConfiguration configNode2 = new VotingConfiguration(Collections.singleton(node2.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, configNode2, configNode1, 42L); + cs1.setInitialState(state1); + + StartJoinRequest startJoinRequest = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join join = cs1.handleStartJoin(startJoinRequest); + assertTrue(cs1.handleJoin(join)); + assertFalse(cs1.electionWon()); + assertEquals(cs1.getLastPublishedVersion(), 0L); + assertFalse(cs1.handleJoin(join)); + } + + public void testHandleClientValue() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + Join v2 = cs2.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + assertTrue(cs1.handleJoin(v2)); + + VotingConfiguration newConfig = new VotingConfiguration(Collections.singleton(node2.getId())); + + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), 2L, node1, initialConfig, newConfig, 42L); + PublishRequest publishRequest = cs1.handleClientValue(state2); + assertThat(publishRequest.getAcceptedState(), equalTo(state2)); + assertThat(cs1.getLastPublishedVersion(), equalTo(state2.version())); + // check that another join does not mess with lastPublishedVersion + Join v3 = cs3.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v3)); + assertThat(cs1.getLastPublishedVersion(), equalTo(state2.version())); + } + + public void testHandleClientValueWhenElectionNotWon() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + if (randomBoolean()) { + cs1.setInitialState(state1); + } + assertThat(expectThrows(CoordinationStateRejectedException.class, () -> cs1.handleClientValue(state1)).getMessage(), + containsString("election not won")); + } + + public void testHandleClientValueDuringOngoingPublication() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), 2L, node1, initialConfig, initialConfig, 42L); + cs1.handleClientValue(state2); + + ClusterState state3 = clusterState(startJoinRequest1.getTerm(), 3L, node1, initialConfig, initialConfig, 42L); + assertThat(expectThrows(CoordinationStateRejectedException.class, () -> cs1.handleClientValue(state3)).getMessage(), + containsString("cannot start publishing next value before accepting previous one")); + } + + public void testHandleClientValueWithBadTerm() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(3, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + + long term = randomBoolean() ? + randomLongBetween(startJoinRequest1.getTerm() + 1, 10) : + randomLongBetween(0, startJoinRequest1.getTerm() - 1); + ClusterState state2 = clusterState(term, 2L, node1, initialConfig, initialConfig, 42L); + assertThat(expectThrows(CoordinationStateRejectedException.class, () -> cs1.handleClientValue(state2)).getMessage(), + containsString("does not match current term")); + } + + public void testHandleClientValueWithOldVersion() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), 1L, node1, initialConfig, initialConfig, 42L); + assertThat(expectThrows(CoordinationStateRejectedException.class, () -> cs1.handleClientValue(state2)).getMessage(), + containsString("lower or equal to last published version")); + } + + public void testHandleClientValueWithDifferentReconfigurationWhileAlreadyReconfiguring() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + Join v2 = cs2.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + assertTrue(cs1.handleJoin(v2)); + + VotingConfiguration newConfig1 = new VotingConfiguration(Collections.singleton(node2.getId())); + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), 2L, node1, initialConfig, newConfig1, 42L); + PublishRequest publishRequest = cs1.handleClientValue(state2); + cs1.handlePublishRequest(publishRequest); + VotingConfiguration newConfig2 = new VotingConfiguration(Collections.singleton(node3.getId())); + ClusterState state3 = clusterState(startJoinRequest1.getTerm(), 3L, node1, initialConfig, newConfig2, 42L); + assertThat(expectThrows(CoordinationStateRejectedException.class, () -> cs1.handleClientValue(state3)).getMessage(), + containsString("only allow reconfiguration while not already reconfiguring")); + } + + public void testHandleClientValueWithSameReconfigurationWhileAlreadyReconfiguring() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + Join v2 = cs2.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + assertTrue(cs1.handleJoin(v2)); + + VotingConfiguration newConfig1 = new VotingConfiguration(Collections.singleton(node2.getId())); + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), 2L, node1, initialConfig, newConfig1, 42L); + PublishRequest publishRequest = cs1.handleClientValue(state2); + cs1.handlePublishRequest(publishRequest); + ClusterState state3 = clusterState(startJoinRequest1.getTerm(), 3L, node1, initialConfig, newConfig1, 42L); + cs1.handleClientValue(state3); + } + + public void testHandleClientValueWithIllegalCommittedConfigurationChange() { + assumeTrue("test only works with assertions enabled", Assertions.ENABLED); + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + Join v2 = cs2.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + assertTrue(cs1.handleJoin(v2)); + + VotingConfiguration newConfig = new VotingConfiguration(Collections.singleton(node2.getId())); + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), 2L, node1, newConfig, newConfig, 42L); + assertThat(expectThrows(AssertionError.class, () -> cs1.handleClientValue(state2)).getMessage(), + containsString("last committed configuration should not change")); + } + + public void testHandleClientValueWithConfigurationChangeButNoJoinQuorum() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + + VotingConfiguration newConfig = new VotingConfiguration(Collections.singleton(node2.getId())); + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), 2L, node1, initialConfig, newConfig, 42L); + assertThat(expectThrows(CoordinationStateRejectedException.class, () -> cs1.handleClientValue(state2)).getMessage(), + containsString("only allow reconfiguration if joinVotes have quorum for new config")); + } + + public void testHandlePublishRequest() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + if (randomBoolean()) { + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + } + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), randomLongBetween(1, 10), node1, initialConfig, initialConfig, 13L); + PublishResponse publishResponse = cs1.handlePublishRequest(new PublishRequest(state2)); + assertThat(publishResponse.getTerm(), equalTo(state2.term())); + assertThat(publishResponse.getVersion(), equalTo(state2.version())); + assertThat(cs1.getLastAcceptedState(), equalTo(state2)); + assertThat(value(cs1.getLastAcceptedState()), equalTo(13L)); + ClusterState state3 = clusterState(startJoinRequest1.getTerm(), randomLongBetween(state2.getVersion() + 1, 20), node1, + initialConfig, initialConfig, 13L); + cs1.handlePublishRequest(new PublishRequest(state3)); + } + + public void testHandlePublishRequestWithBadTerm() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + if (randomBoolean()) { + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + } + long term = randomBoolean() ? + randomLongBetween(startJoinRequest1.getTerm() + 1, 10) : + randomLongBetween(0, startJoinRequest1.getTerm() - 1); + ClusterState state2 = clusterState(term, 2L, node1, initialConfig, initialConfig, 42L); + assertThat(expectThrows(CoordinationStateRejectedException.class, + () -> cs1.handlePublishRequest(new PublishRequest(state2))).getMessage(), + containsString("does not match current term")); + } + + // scenario when handling a publish request from a master that we already received a newer state from + public void testHandlePublishRequestWithSameTermButOlderOrSameVersion() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + if (randomBoolean()) { + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + } + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), randomLongBetween(2, 10), node1, initialConfig, initialConfig, 42L); + cs1.handlePublishRequest(new PublishRequest(state2)); + ClusterState state3 = clusterState(startJoinRequest1.getTerm(), randomLongBetween(0, state2.version()), node1, initialConfig, + initialConfig, 42L); + assertThat(expectThrows(CoordinationStateRejectedException.class, + () -> cs1.handlePublishRequest(new PublishRequest(state3))).getMessage(), + containsString("lower or equal to current version")); + } + + // scenario when handling a publish request from a fresh master + public void testHandlePublishRequestWithTermHigherThanLastAcceptedTerm() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + ClusterState state1 = clusterState(startJoinRequest1.getTerm(), randomLongBetween(2, 10), node1, initialConfig, initialConfig, 42L); + cs2.handleStartJoin(startJoinRequest1); + cs2.handlePublishRequest(new PublishRequest(state1)); + StartJoinRequest startJoinRequest2 = new StartJoinRequest(node1, randomLongBetween(startJoinRequest1.getTerm() + 1, 10)); + cs2.handleStartJoin(startJoinRequest2); + ClusterState state2 = clusterState(startJoinRequest2.getTerm(), randomLongBetween(0, 20), node1, initialConfig, + initialConfig, 42L); + cs2.handlePublishRequest(new PublishRequest(state2)); + } + + public void testHandlePublishResponseWithCommit() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), 2L, node1, initialConfig, initialConfig, 42L); + PublishRequest publishRequest = cs1.handleClientValue(state2); + PublishResponse publishResponse = cs1.handlePublishRequest(publishRequest); + Optional applyCommit = cs1.handlePublishResponse(node1, publishResponse); + assertTrue(applyCommit.isPresent()); + assertThat(applyCommit.get().getSourceNode(), equalTo(node1)); + assertThat(applyCommit.get().getTerm(), equalTo(state2.term())); + assertThat(applyCommit.get().getVersion(), equalTo(state2.version())); + } + + public void testHandlePublishResponseWhenSteppedDownAsLeader() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), 2L, node1, initialConfig, initialConfig, 42L); + PublishRequest publishRequest = cs1.handleClientValue(state2); + PublishResponse publishResponse = cs1.handlePublishRequest(publishRequest); + StartJoinRequest startJoinRequest2 = new StartJoinRequest(node1, randomLongBetween(startJoinRequest1.getTerm() + 1, 10)); + cs1.handleStartJoin(startJoinRequest2); + assertThat(expectThrows(CoordinationStateRejectedException.class, + () -> cs1.handlePublishResponse(randomFrom(node1, node2, node3), publishResponse)).getMessage(), + containsString("election not won")); + } + + public void testHandlePublishResponseWithoutPublishConfigQuorum() { + VotingConfiguration configNode1 = new VotingConfiguration(Collections.singleton(node1.getId())); + VotingConfiguration configNode2 = new VotingConfiguration(Collections.singleton(node2.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, configNode1, configNode1, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + Join v2 = cs2.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v2)); + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), 2L, node1, configNode1, configNode2, 42L); + PublishRequest publishRequest = cs1.handleClientValue(state2); + PublishResponse publishResponse = cs1.handlePublishRequest(publishRequest); + Optional applyCommit = cs1.handlePublishResponse(node1, publishResponse); + assertFalse(applyCommit.isPresent()); + } + + public void testHandlePublishResponseWithoutCommitedConfigQuorum() { + VotingConfiguration configNode1 = new VotingConfiguration(Collections.singleton(node1.getId())); + VotingConfiguration configNode2 = new VotingConfiguration(Collections.singleton(node2.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, configNode1, configNode1, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + Join v2 = cs2.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v2)); + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), 2L, node1, configNode1, configNode2, 42L); + PublishRequest publishRequest = cs1.handleClientValue(state2); + PublishResponse publishResponse = cs2.handlePublishRequest(publishRequest); + Optional applyCommit = cs1.handlePublishResponse(node2, publishResponse); + assertFalse(applyCommit.isPresent()); + } + + public void testHandlePublishResponseWithoutCommit() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), 2L, node1, initialConfig, initialConfig, 42L); + PublishRequest publishRequest = cs1.handleClientValue(state2); + PublishResponse publishResponse = cs1.handlePublishRequest(publishRequest); + Optional applyCommit = cs1.handlePublishResponse(node2, publishResponse); + assertFalse(applyCommit.isPresent()); + } + + public void testHandlePublishResponseWithBadTerm() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), randomLongBetween(2, 10), node1, initialConfig, initialConfig, 42L); + PublishResponse publishResponse = cs1.handlePublishRequest(new PublishRequest(state2)); + long term = randomBoolean() ? + randomLongBetween(startJoinRequest1.getTerm() + 1, 10) : + randomLongBetween(0, startJoinRequest1.getTerm() - 1); + assertThat(expectThrows(CoordinationStateRejectedException.class, + () -> cs1.handlePublishResponse(randomFrom(node1, node2, node3), + new PublishResponse(term, publishResponse.getVersion()))).getMessage(), + containsString("does not match current term")); + } + + public void testHandlePublishResponseWithVersionMismatch() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), randomLongBetween(2, 10), node1, initialConfig, initialConfig, 42L); + PublishResponse publishResponse = cs1.handlePublishRequest(new PublishRequest(state2)); + assertThat(expectThrows(CoordinationStateRejectedException.class, + () -> cs1.handlePublishResponse(randomFrom(node1, node2, node3), publishResponse)).getMessage(), + containsString("does not match current version")); + } + + public void testHandleCommit() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + Join v2 = cs2.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v2)); + VotingConfiguration newConfig = new VotingConfiguration(Collections.singleton(node2.getId())); + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), 2L, node1, initialConfig, newConfig, 7L); + PublishRequest publishRequest = cs1.handleClientValue(state2); + PublishResponse publishResponse = cs1.handlePublishRequest(publishRequest); + cs1.handlePublishResponse(node1, publishResponse); + Optional applyCommit = cs1.handlePublishResponse(node2, publishResponse); + assertTrue(applyCommit.isPresent()); + assertThat(cs1.getLastCommittedConfiguration(), equalTo(initialConfig)); + cs1.handleCommit(applyCommit.get()); + assertThat(cs1.getLastCommittedConfiguration(), equalTo(newConfig)); + } + + public void testHandleCommitWithBadCurrentTerm() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), 2L, node1, initialConfig, initialConfig, 7L); + PublishRequest publishRequest = cs1.handleClientValue(state2); + PublishResponse publishResponse = cs1.handlePublishRequest(publishRequest); + cs1.handlePublishResponse(node1, publishResponse); + long term = randomBoolean() ? + randomLongBetween(startJoinRequest1.getTerm() + 1, 10) : + randomLongBetween(0, startJoinRequest1.getTerm() - 1); + assertThat(expectThrows(CoordinationStateRejectedException.class, + () -> cs1.handleCommit(new ApplyCommitRequest(node1, term, 2L))).getMessage(), + containsString("does not match current term")); + } + + public void testHandleCommitWithBadLastAcceptedTerm() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + assertThat(expectThrows(CoordinationStateRejectedException.class, + () -> cs1.handleCommit(new ApplyCommitRequest(node1, startJoinRequest1.getTerm(), 2L))).getMessage(), + containsString("does not match last accepted term")); + } + + public void testHandleCommitWithBadVersion() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), 2L, node1, initialConfig, initialConfig, 7L); + PublishRequest publishRequest = cs1.handleClientValue(state2); + cs1.handlePublishRequest(publishRequest); + assertThat(expectThrows(CoordinationStateRejectedException.class, + () -> cs1.handleCommit(new ApplyCommitRequest(node1, startJoinRequest1.getTerm(), randomLongBetween(3, 10)))).getMessage(), + containsString("does not match current version")); + } + + public void testVoteCollection() { + final CoordinationState.VoteCollection voteCollection = new CoordinationState.VoteCollection(); + assertTrue(voteCollection.isEmpty()); + voteCollection.addVote(node1); + assertFalse(voteCollection.isEmpty()); + assertTrue(voteCollection.containsVoteFor(node1)); + assertFalse(voteCollection.containsVoteFor(node2)); + assertFalse(voteCollection.containsVoteFor(node3)); + voteCollection.addVote(node2); + assertTrue(voteCollection.containsVoteFor(node1)); + assertTrue(voteCollection.containsVoteFor(node2)); + assertFalse(voteCollection.containsVoteFor(node3)); + assertTrue(voteCollection.isQuorum(new VotingConfiguration(Sets.newHashSet(node1.getId(), node2.getId())))); + assertTrue(voteCollection.isQuorum(new VotingConfiguration(Sets.newHashSet(node1.getId())))); + assertFalse(voteCollection.isQuorum(new VotingConfiguration(Sets.newHashSet(node3.getId())))); + + EqualsHashCodeTestUtils.CopyFunction copyFunction = + vc -> { + CoordinationState.VoteCollection voteCollection1 = new CoordinationState.VoteCollection(); + for (DiscoveryNode node : vc.nodes()) { + voteCollection1.addVote(node); + } + return voteCollection1; + }; + EqualsHashCodeTestUtils.checkEqualsAndHashCode(voteCollection, copyFunction, + vc -> { + CoordinationState.VoteCollection copy = copyFunction.copy(vc); + copy.addVote(createNode(randomAlphaOfLength(10))); + return copy; + }); + } + + public static CoordinationState createCoordinationState(PersistedState storage, DiscoveryNode localNode) { + final Settings initialSettings = Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), localNode.getId()).build(); + return new CoordinationState(initialSettings, localNode, storage); + } + + public static ClusterState clusterState(long term, long version, DiscoveryNode localNode, VotingConfiguration lastCommittedConfig, + VotingConfiguration lastAcceptedConfig, long value) { + return clusterState(term, version, DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).build(), + lastCommittedConfig, lastAcceptedConfig, value); + } + + public static ClusterState clusterState(long term, long version, DiscoveryNodes discoveryNodes, VotingConfiguration lastCommittedConfig, + VotingConfiguration lastAcceptedConfig, long value) { + return setValue(ClusterState.builder(ClusterName.DEFAULT) + .version(version) + .term(term) + .lastCommittedConfiguration(lastCommittedConfig) + .lastAcceptedConfiguration(lastAcceptedConfig) + .nodes(discoveryNodes) + .metaData(MetaData.builder() + .clusterUUID(UUIDs.randomBase64UUID(random()))) // generate cluster UUID deterministically for repeatable tests + .stateUUID(UUIDs.randomBase64UUID(random())) // generate cluster state UUID deterministically for repeatable tests + .build(), value); + } + + public static ClusterState setValue(ClusterState clusterState, long value) { + return ClusterState.builder(clusterState).metaData( + MetaData.builder(clusterState.metaData()) + .persistentSettings(Settings.builder() + .put(clusterState.metaData().persistentSettings()) + .put("value", value) + .build()) + .build()) + .build(); + } + + public static long value(ClusterState clusterState) { + return clusterState.metaData().persistentSettings().getAsLong("value", 0L); + } + + public static class InMemoryPersistedState implements PersistedState { + + private long currentTerm; + private ClusterState acceptedState; + + public InMemoryPersistedState(long term, ClusterState acceptedState) { + this.currentTerm = term; + this.acceptedState = acceptedState; + + assert currentTerm >= 0; + assert getLastAcceptedState().term() <= currentTerm : + "last accepted term " + getLastAcceptedState().term() + " cannot be above current term " + currentTerm; + } + + @Override + public void setCurrentTerm(long currentTerm) { + assert this.currentTerm <= currentTerm; + this.currentTerm = currentTerm; + } + + @Override + public void setLastAcceptedState(ClusterState clusterState) { + this.acceptedState = clusterState; + } + + @Override + public long getCurrentTerm() { + return currentTerm; + } + + @Override + public ClusterState getLastAcceptedState() { + return acceptedState; + } + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java new file mode 100644 index 0000000000000..7fa4a3217348f --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java @@ -0,0 +1,148 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.EqualsHashCodeTestUtils; + +public class MessagesTests extends ESTestCase { + + private DiscoveryNode createNode(String id) { + return new DiscoveryNode(id, buildNewFakeTransportAddress(), Version.CURRENT); + } + + public void testJoinEqualsHashCodeSerialization() { + Join initialJoin = new Join(createNode(randomAlphaOfLength(10)), createNode(randomAlphaOfLength(10)), randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong()); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialJoin, + join -> copyWriteable(join, writableRegistry(), Join::new), + join -> { + switch (randomInt(4)) { + case 0: + // change sourceNode + return new Join(createNode(randomAlphaOfLength(20)), join.getTargetNode(), join.getTerm(), + join.getLastAcceptedTerm(), join.getLastAcceptedVersion()); + case 1: + // change targetNode + return new Join(join.getSourceNode(), createNode(randomAlphaOfLength(20)), join.getTerm(), + join.getLastAcceptedTerm(), join.getLastAcceptedVersion()); + case 2: + // change term + return new Join(join.getSourceNode(), join.getTargetNode(), + randomValueOtherThan(join.getTerm(), ESTestCase::randomNonNegativeLong), join.getLastAcceptedTerm(), + join.getLastAcceptedVersion()); + case 3: + // change last accepted term + return new Join(join.getSourceNode(), join.getTargetNode(), join.getTerm(), + randomValueOtherThan(join.getLastAcceptedTerm(), ESTestCase::randomNonNegativeLong), + join.getLastAcceptedVersion()); + case 4: + // change version + return new Join(join.getSourceNode(), join.getTargetNode(), + join.getTerm(), join.getLastAcceptedTerm(), + randomValueOtherThan(join.getLastAcceptedVersion(), ESTestCase::randomNonNegativeLong)); + default: + throw new AssertionError(); + } + }); + } + + public void testPublishRequestEqualsHashCodeSerialization() { + PublishRequest initialPublishRequest = new PublishRequest(randomClusterState()); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialPublishRequest, + publishRequest -> copyWriteable(publishRequest, writableRegistry(), + in -> new PublishRequest(in, publishRequest.getAcceptedState().nodes().getLocalNode())), + in -> new PublishRequest(randomClusterState())); + } + + public void testPublishResponseEqualsHashCodeSerialization() { + PublishResponse initialPublishResponse = new PublishResponse(randomNonNegativeLong(), randomNonNegativeLong()); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialPublishResponse, + publishResponse -> copyWriteable(publishResponse, writableRegistry(), PublishResponse::new), + publishResponse -> { + switch (randomInt(1)) { + case 0: + // change term + return new PublishResponse(randomValueOtherThan(publishResponse.getTerm(), ESTestCase::randomNonNegativeLong), + publishResponse.getVersion()); + case 1: + // change version + return new PublishResponse(publishResponse.getTerm(), + randomValueOtherThan(publishResponse.getVersion(), ESTestCase::randomNonNegativeLong)); + default: + throw new AssertionError(); + } + }); + } + + public void testStartJoinRequestEqualsHashCodeSerialization() { + StartJoinRequest initialStartJoinRequest = new StartJoinRequest(createNode(randomAlphaOfLength(10)), randomNonNegativeLong()); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialStartJoinRequest, + startJoinRequest -> copyWriteable(startJoinRequest, writableRegistry(), StartJoinRequest::new), + startJoinRequest -> { + switch (randomInt(1)) { + case 0: + // change sourceNode + return new StartJoinRequest(createNode(randomAlphaOfLength(20)), startJoinRequest.getTerm()); + case 1: + // change term + return new StartJoinRequest(startJoinRequest.getSourceNode(), + randomValueOtherThan(startJoinRequest.getTerm(), ESTestCase::randomNonNegativeLong)); + default: + throw new AssertionError(); + } + }); + } + + public void testApplyCommitEqualsHashCodeSerialization() { + ApplyCommitRequest initialApplyCommit = new ApplyCommitRequest(createNode(randomAlphaOfLength(10)), randomNonNegativeLong(), + randomNonNegativeLong()); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialApplyCommit, + applyCommit -> copyWriteable(applyCommit, writableRegistry(), ApplyCommitRequest::new), + applyCommit -> { + switch (randomInt(2)) { + case 0: + // change sourceNode + return new ApplyCommitRequest(createNode(randomAlphaOfLength(20)), applyCommit.getTerm(), applyCommit.getVersion()); + case 1: + // change term + return new ApplyCommitRequest(applyCommit.getSourceNode(), + randomValueOtherThan(applyCommit.getTerm(), ESTestCase::randomNonNegativeLong), applyCommit.getVersion()); + case 2: + // change version + return new ApplyCommitRequest(applyCommit.getSourceNode(), applyCommit.getTerm(), + randomValueOtherThan(applyCommit.getVersion(), ESTestCase::randomNonNegativeLong)); + default: + throw new AssertionError(); + } + }); + } + + public ClusterState randomClusterState() { + return CoordinationStateTests.clusterState(randomNonNegativeLong(), randomNonNegativeLong(), createNode(randomAlphaOfLength(10)), + new ClusterState.VotingConfiguration(Sets.newHashSet(generateRandomStringArray(10, 10, false))), + new ClusterState.VotingConfiguration(Sets.newHashSet(generateRandomStringArray(10, 10, false))), + randomLong()); + } +}