9292import static org .hamcrest .Matchers .equalTo ;
9393import static org .hamcrest .Matchers .greaterThanOrEqualTo ;
9494import static org .hamcrest .Matchers .is ;
95+ import static org .hamcrest .Matchers .lessThanOrEqualTo ;
9596import static org .hamcrest .Matchers .not ;
9697
9798@ TestLogging ("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.discovery:TRACE" )
@@ -305,7 +306,7 @@ public void testAckListenerReceivesNackFromLeader() {
305306
306307 leader .setClusterStateApplyResponse (ClusterStateApplyResponse .FAIL );
307308 AckCollector ackCollector = leader .submitValue (randomLong ());
308- cluster .runFor (DEFAULT_CLUSTER_STATE_UPDATE_DELAY );
309+ cluster .runFor (DEFAULT_CLUSTER_STATE_UPDATE_DELAY , "committing value" );
309310 assertTrue (leader .coordinator .getMode () != Coordinator .Mode .LEADER || leader .coordinator .getCurrentTerm () > startingTerm );
310311 leader .setClusterStateApplyResponse (ClusterStateApplyResponse .SUCCEED );
311312 cluster .stabilise ();
@@ -325,7 +326,7 @@ public void testAckListenerReceivesNoAckFromHangingFollower() {
325326
326327 follower0 .setClusterStateApplyResponse (ClusterStateApplyResponse .HANG );
327328 AckCollector ackCollector = leader .submitValue (randomLong ());
328- cluster .runFor (DEFAULT_CLUSTER_STATE_UPDATE_DELAY );
329+ cluster .runFor (DEFAULT_CLUSTER_STATE_UPDATE_DELAY , "committing value" );
329330 assertTrue ("expected immediate ack from " + follower1 , ackCollector .hasAckedSuccessfully (follower1 ));
330331 assertFalse ("expected no ack from " + leader , ackCollector .hasAckedSuccessfully (leader ));
331332 cluster .stabilise ();
@@ -344,7 +345,7 @@ public void testAckListenerReceivesNacksIfPublicationTimesOut() {
344345 follower0 .blackhole ();
345346 follower1 .blackhole ();
346347 AckCollector ackCollector = leader .submitValue (randomLong ());
347- cluster .runFor (DEFAULT_CLUSTER_STATE_UPDATE_DELAY );
348+ cluster .runFor (DEFAULT_CLUSTER_STATE_UPDATE_DELAY , "committing value" );
348349 assertFalse ("expected no immediate ack from " + leader , ackCollector .hasAcked (leader ));
349350 assertFalse ("expected no immediate ack from " + follower0 , ackCollector .hasAcked (follower0 ));
350351 assertFalse ("expected no immediate ack from " + follower1 , ackCollector .hasAcked (follower1 ));
@@ -501,6 +502,8 @@ void runRandomly() {
501502
502503 while (finishTime == -1 || deterministicTaskQueue .getCurrentTimeMillis () <= finishTime ) {
503504 step ++;
505+ final int thisStep = step ; // for lambdas
506+
504507 if (randomSteps <= step && finishTime == -1 ) {
505508 finishTime = deterministicTaskQueue .getLatestDeferredExecutionTime ();
506509 deterministicTaskQueue .setExecutionDelayVariabilityMillis (DEFAULT_DELAY_VARIABILITY );
@@ -511,14 +514,19 @@ void runRandomly() {
511514 if (rarely ()) {
512515 final ClusterNode clusterNode = getAnyNodePreferringLeaders ();
513516 final int newValue = randomInt ();
514- logger .debug ("----> [runRandomly {}] proposing new value [{}] to [{}]" , step , newValue , clusterNode .getId ());
515- clusterNode .submitValue (newValue );
517+ onNode (clusterNode .getLocalNode (), () -> {
518+ logger .debug ("----> [runRandomly {}] proposing new value [{}] to [{}]" ,
519+ thisStep , newValue , clusterNode .getId ());
520+ clusterNode .submitValue (newValue );
521+ }).run ();
516522 } else if (rarely ()) {
517523 final ClusterNode clusterNode = getAnyNode ();
518- logger .debug ("----> [runRandomly {}] forcing {} to become candidate" , step , clusterNode .getId ());
519- synchronized (clusterNode .coordinator .mutex ) {
520- clusterNode .coordinator .becomeCandidate ("runRandomly" );
521- }
524+ onNode (clusterNode .getLocalNode (), () -> {
525+ logger .debug ("----> [runRandomly {}] forcing {} to become candidate" , thisStep , clusterNode .getId ());
526+ synchronized (clusterNode .coordinator .mutex ) {
527+ clusterNode .coordinator .becomeCandidate ("runRandomly" );
528+ }
529+ }).run ();
522530 } else if (rarely ()) {
523531 final ClusterNode clusterNode = getAnyNode ();
524532
@@ -587,10 +595,10 @@ void stabilise() {
587595 stabilise (DEFAULT_STABILISATION_TIME );
588596 }
589597
590- void stabilise (long stabiliationDurationMillis ) {
591- logger . info ( "--> stabilising until [{}ms]" , deterministicTaskQueue . getCurrentTimeMillis () + stabiliationDurationMillis );
592- deterministicTaskQueue .setExecutionDelayVariabilityMillis ( DEFAULT_DELAY_VARIABILITY );
593- runFor (stabiliationDurationMillis );
598+ void stabilise (long stabilisationDurationMillis ) {
599+ assertThat ( "stabilisation requires default delay variability (and proper cleanup of raised variability)" ,
600+ deterministicTaskQueue .getExecutionDelayVariabilityMillis (), lessThanOrEqualTo ( DEFAULT_DELAY_VARIABILITY ) );
601+ runFor (stabilisationDurationMillis , "stabilising" );
594602
595603 // TODO remove when term-bumping is enabled
596604 final long maxTerm = clusterNodes .stream ().map (n -> n .coordinator .getCurrentTerm ()).max (Long ::compare ).orElse (0L );
@@ -600,20 +608,22 @@ void stabilise(long stabiliationDurationMillis) {
600608 if (maxLeaderTerm < maxTerm ) {
601609 logger .info ("--> forcing a term bump, maxTerm={}, maxLeaderTerm={}" , maxTerm , maxLeaderTerm );
602610 final ClusterNode leader = getAnyLeader ();
603- synchronized (leader .coordinator . mutex ) {
604- leader .coordinator .ensureTermAtLeast ( leader . localNode , maxTerm + 1 );
605- }
606- leader . coordinator . startElection ();
607- logger . info ( "--> re-stabilising after term bump until [{}ms]" ,
608- deterministicTaskQueue . getCurrentTimeMillis () + DEFAULT_ELECTION_DELAY );
609- runFor (DEFAULT_ELECTION_DELAY );
611+ onNode (leader .getLocalNode (), () -> {
612+ synchronized ( leader .coordinator .mutex ) {
613+ leader . coordinator . ensureTermAtLeast ( leader . localNode , maxTerm + 1 );
614+ }
615+ leader . coordinator . startElection ();
616+ }). run ( );
617+ runFor (DEFAULT_ELECTION_DELAY , "re-stabilising after term bump" );
610618 }
619+ logger .info ("--> end of stabilisation" );
611620
612621 assertUniqueLeaderAndExpectedModes ();
613622 }
614623
615- void runFor (long runDurationMillis ) {
624+ void runFor (long runDurationMillis , String description ) {
616625 final long endTime = deterministicTaskQueue .getCurrentTimeMillis () + runDurationMillis ;
626+ logger .info ("----> runFor({}ms) running until [{}ms]: {}" , runDurationMillis , endTime , description );
617627
618628 while (deterministicTaskQueue .getCurrentTimeMillis () < endTime ) {
619629
@@ -637,6 +647,8 @@ void runFor(long runDurationMillis) {
637647
638648 deterministicTaskQueue .advanceTime ();
639649 }
650+
651+ logger .info ("----> runFor({}ms) completed run until [{}ms]: {}" , runDurationMillis , endTime , description );
640652 }
641653
642654 private boolean isConnectedPair (ClusterNode n1 , ClusterNode n2 ) {
@@ -963,7 +975,7 @@ public void run() {
963975 final ClusterState newClusterState = clusterStateSupplier .get ();
964976 assert oldClusterState .version () <= newClusterState .version () :
965977 "updating cluster state from version "
966- + oldClusterState .version () + " to stale version " + newClusterState .version ();
978+ + oldClusterState .version () + " to stale version " + newClusterState .version ();
967979 clusterApplier .lastAppliedClusterState = newClusterState ;
968980 }
969981
0 commit comments