-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Zen2: Deterministic MasterService #32493
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Pinging @elastic/es-distributed |
DaveCTurner
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I started to review this but did not get very far so only have superficial comments here. To be continued...
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.elasticsearch.discovery; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think I'd prefer this to be in org.elasticsearch.cluster.coordination.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 40d7c95
| * The {@link AckListener} allows to keep track of the ack received from nodes, and verify whether | ||
| * they updated their own cluster state or not. | ||
| * | ||
| * The method is guaranteed to throw a {@link FailedToCommitClusterStateException} if the change is not committed and should be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think "throw" here now means "pass to publishListener::onFailure".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 40d7c95
| * Publish all the changes to the cluster from the master (can be called just by the master). The publish | ||
| * process should apply this state to the master as well! | ||
| * | ||
| * The publishListener allows to wait for the publication to go through. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"go through" meaning complete/fail/timeout?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 40d7c95
|
|
||
| interface AckListener { | ||
| /** | ||
| * Should be called when the discovery layer has committed the clusters state (i.e. even if this publication fails, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/discovery/coordination/?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 40d7c95
| void onCommit(TimeValue commitTime); | ||
|
|
||
| /** | ||
| * Should be called whenever the discovery layer receives confirmation from a node that it has successfully applied |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/discovery/coordination/?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 40d7c95
DaveCTurner
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change to ZenDiscovery doesn't look right. Also some other minor comments.
| org.elasticsearch.indices.TypeMissingException::new, 137, UNKNOWN_VERSION_ADDED), | ||
| FAILED_TO_COMMIT_CLUSTER_STATE_EXCEPTION(org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException.class, | ||
| org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException::new, 140, UNKNOWN_VERSION_ADDED), | ||
| FAILED_TO_COMMIT_CLUSTER_STATE_EXCEPTION(FailedToCommitClusterStateException.class, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost all of these registrations use the fully-qualified class name (except CoordinationStateRejectedException, oops) so it looks like this should too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, also fixed for CoordinationStateRejectedException in d3c5a3d
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.elasticsearch.discovery; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be in org.elasticsearch.cluster.coordination.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in d3c5a3d
| assert false : "cluster state published locally neither processed nor failed: " + newState; | ||
| logger.warn("cluster state with version [{}] that is published locally has neither been processed nor failed", | ||
| newState.version()); | ||
| return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This, and the containing synchronised block, don't look right. They throw FailedToCommitClusterStateException rather than passing it to the publishListener, and previously they returned early without blocking on the publication but the equivalent flow now would be to call publishListener.onResponse(null) early somehow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 506608c
| Environment environment = TestEnvironment.newEnvironment(settings); | ||
| Transport transport = mock(Transport.class); // it's not used | ||
| nextMasterTaskToRun = new AtomicReference<>(); | ||
| FakeThreadPoolMasterService masterService = new FakeThreadPoolMasterService("fake-master", nextMasterTaskToRun::set); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need any kind of assertion that nextMasterTaskToRun isn't already set?
LGTM after 506608c, but needs another reviewer.
bleskes
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some minor comments. The main change LGTM. The only concern I had, as discussed with @ywelsch , is that the integration of FakeThreadPoolMasterService with ClusterStateChanges is a bit clunky. It is my understanding that FakeThreadPoolMasterService is a very useful testing component for other parts of the work, but in that case I rather not use (as is) with ClusterStateChanges.
|
|
||
| protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeNS) throws Exception { | ||
| CompletableFuture<Void> fut = new CompletableFuture<>(); | ||
| clusterStatePublisher.publish(clusterChangedEvent, new ActionListener<Void>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use ActionListener#wrap to make this slightly more compact. Also, I presume you consciously choose for a CompletableFuture over PlainActionFuture ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I did not choose PlainActionFuture was because it asserts that we're not blocking on the MasterServiceUpdateThread (which this future deliberately does). Unfortunately, CompletableFuture has other problems (#32512 (comment)), so I've gone back to PlainActionFuture, but added a hook that allows to disable checking some of the assertions, see 526511d
| } | ||
| }, taskOutputs.createAckListener(threadPool, clusterChangedEvent.state())); | ||
|
|
||
| final ActionListener<Void> publishListener = getPublishListener(clusterChangedEvent, taskOutputs, startTimeNS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this extra listener construct? at the moment it's activated fully sequentially. It will be simpler to just process the results of the future inline?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The extra listener construct is not needed. I've changed this in c84ddf7
| return newClusterState; | ||
| } | ||
|
|
||
| public Builder incrementVersion(ClusterState clusterState) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be protected
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 526511d
| // no replicas in oder to skip the replication part | ||
| setState(clusterService, new ClusterStateChanges(xContentRegistry(), threadPool).closeIndices(state(index, true, | ||
| ShardRoutingState.UNASSIGNED), new CloseIndexRequest(index))); | ||
| ClusterStateChanges clusterStateChanges = new ClusterStateChanges(xContentRegistry(), threadPool); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering - why is this change needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason was that we made ClusterStateChanges more realistic now by introducing MasterService to it. This test was using a cluster state where the local node was not the master. As ClusterStateChanges now used the proper MasterService, it simply rejected the cluster state update to close the indices.
|
|
||
| @Override | ||
| public void onResponse(Void aVoid) { | ||
| assertThat(countDownLatch.getCount(), is(1L)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe synchronize this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in b9d407f
Ok, I've reverted this in e4bd482 |
DaveCTurner
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a couple of nits but the extra changes still LGTM.
| ids.put(138, null); | ||
| ids.put(139, null); | ||
| ids.put(140, org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException.class); | ||
| ids.put(140, FailedToCommitClusterStateException.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should still be fully-qualified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
urgs... fixed in a0030c0
| final PlainActionFuture<Void> fut = new PlainActionFuture<Void>() { | ||
| @Override | ||
| protected boolean blockingAllowed() { | ||
| // allow this one to block on the MasterServiceUpdateThread |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment would be unnecessary if we wrote something like:
return Thread.currentThread().getName().contains(MASTER_UPDATE_THREAD_NAME) || super.blockingAllowed();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great idea. fixed in a0030c0
Increases testability of MasterService and the discovery layer. Changes: