Skip to content

Commit 35a0826

Browse files
authored
Fix worker no heartbeat race condition (#734)
* fix worker no heartbeat race condition * log refactor
1 parent e90032d commit 35a0826

File tree

2 files changed

+117
-17
lines changed
  • mantis-control-plane/mantis-control-plane-server/src

2 files changed

+117
-17
lines changed

mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1943,15 +1943,25 @@ public void checkHeartBeats(Instant currentTime) {
19431943
acceptedAt);
19441944
}
19451945
} else {
1946-
if (Duration.between(workerMeta.getLastHeartbeatAt().get(), currentTime).getSeconds()
1946+
// no heartbeat or heartbeat too old
1947+
if (!workerMeta.getLastHeartbeatAt().isPresent() || Duration.between(workerMeta.getLastHeartbeatAt().get(), currentTime).getSeconds()
19471948
> missedHeartBeatToleranceSecs) {
1948-
// heartbeat too old
19491949
this.numWorkerMissingHeartbeat.increment();
1950-
LOGGER.info("Job {}, Worker {} Duration between last heartbeat and now {} "
1951-
+ "missed heart beat threshold {} exceeded", this.jobMgr.getJobId(),
1952-
workerMeta.getWorkerId(), Duration.between(
1953-
workerMeta.getLastHeartbeatAt().get(),
1954-
currentTime).getSeconds(), missedHeartBeatToleranceSecs);
1950+
1951+
if (!workerMeta.getLastHeartbeatAt().isPresent()) {
1952+
LOGGER.warn("Job {}, Worker {} hasn't received heartbeat, threshold {} exceeded",
1953+
this.jobMgr.getJobId(),
1954+
workerMeta.getWorkerId(),
1955+
missedHeartBeatToleranceSecs);
1956+
} else {
1957+
LOGGER.warn("Job {}, Worker {} Duration between last heartbeat and now {} "
1958+
+ "missed heart beat threshold {} exceeded",
1959+
this.jobMgr.getJobId(),
1960+
workerMeta.getWorkerId(),
1961+
Duration.between(
1962+
workerMeta.getLastHeartbeatAt().get(),
1963+
currentTime).getSeconds(), missedHeartBeatToleranceSecs);
1964+
}
19551965

19561966
if (ConfigurationProvider.getConfig().isHeartbeatTerminationEnabled()) {
19571967
eventPublisher.publishStatusEvent(new LifecycleEventsProto.WorkerStatusEvent(WARN,

mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobTestLifecycle.java

Lines changed: 100 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,7 @@
2323
import static org.junit.Assert.assertTrue;
2424
import static org.junit.Assert.fail;
2525
import static org.mockito.Matchers.any;
26-
import static org.mockito.Mockito.mock;
27-
import static org.mockito.Mockito.timeout;
28-
import static org.mockito.Mockito.times;
29-
import static org.mockito.Mockito.verify;
26+
import static org.mockito.Mockito.*;
3027

3128
import akka.actor.ActorRef;
3229
import akka.actor.ActorSystem;
@@ -118,6 +115,7 @@ public void testJobSubmitWithoutInit() {
118115
try {
119116
jobDefn = JobTestHelper.generateJobDefinition(clusterName);
120117
MantisScheduler schedulerMock = mock(MantisScheduler.class);
118+
when(schedulerMock.schedulerHandlesAllocationRetries()).thenReturn(true);
121119
MantisJobStore jobStoreMock = mock(MantisJobStore.class);
122120
MantisJobMetadataImpl mantisJobMetaData = new MantisJobMetadataImpl.Builder()
123121
.withJobId(new JobId(clusterName,1))
@@ -151,6 +149,7 @@ public void testJobSubmit() {
151149
// IMantisStorageProvider storageProvider = new SimpleCachedFileStorageProvider();
152150
// MantisJobStore jobStore = new MantisJobStore(storageProvider);
153151
MantisScheduler schedulerMock = mock(MantisScheduler.class);
152+
when(schedulerMock.schedulerHandlesAllocationRetries()).thenReturn(true);
154153
MantisJobStore jobStoreMock = mock(MantisJobStore.class);
155154
MantisJobMetadataImpl mantisJobMetaData = new MantisJobMetadataImpl.Builder()
156155
.withJobId(new JobId(clusterName,1))
@@ -246,6 +245,7 @@ public void testJobSubmitPerpetual() {
246245
.withJobSla(new JobSla(0, 0, null, MantisJobDurationType.Perpetual, null))
247246
.build();
248247
MantisScheduler schedulerMock = mock(MantisScheduler.class);
248+
when(schedulerMock.schedulerHandlesAllocationRetries()).thenReturn(true);
249249
MantisJobStore jobStoreMock = mock(MantisJobStore.class);
250250
MantisJobMetadataImpl mantisJobMetaData = new MantisJobMetadataImpl.Builder()
251251
.withJobId(new JobId(clusterName,1))
@@ -338,6 +338,7 @@ public void testJobSubmitInitalizationFails() {
338338
try {
339339
jobDefn = JobTestHelper.generateJobDefinition(clusterName);
340340
MantisScheduler schedulerMock = mock(MantisScheduler.class);
341+
when(schedulerMock.schedulerHandlesAllocationRetries()).thenReturn(true);
341342
MantisJobStore jobStoreMock = mock(MantisJobStore.class);
342343
Mockito.doThrow(IOException.class).when(jobStoreMock).storeNewJob(any());
343344
MantisJobMetadataImpl mantisJobMetaData = new MantisJobMetadataImpl.Builder()
@@ -385,6 +386,7 @@ public void testJobSubmitWithMultipleWorkers() {
385386

386387
jobDefn = JobTestHelper.generateJobDefinition(clusterName, sInfo);
387388
MantisScheduler schedulerMock = mock(MantisScheduler.class);
389+
when(schedulerMock.schedulerHandlesAllocationRetries()).thenReturn(true);
388390
MantisJobStore jobStoreMock = mock(MantisJobStore.class);
389391
MantisJobMetadataImpl mantisJobMetaData = new MantisJobMetadataImpl.Builder()
390392
.withJobId(new JobId(clusterName,2))
@@ -497,6 +499,7 @@ public void testJobSubmitWithMultipleStagesAndWorkers() {
497499

498500
jobDefn = JobTestHelper.generateJobDefinition(clusterName, sInfo);
499501
MantisScheduler schedulerMock = mock(MantisScheduler.class);
502+
when(schedulerMock.schedulerHandlesAllocationRetries()).thenReturn(true);
500503
MantisJobStore jobStoreMock = mock(MantisJobStore.class);
501504
MantisJobMetadataImpl mantisJobMetaData = new MantisJobMetadataImpl.Builder()
502505
.withJobId(new JobId(clusterName,1))
@@ -605,6 +608,7 @@ public void testListActiveWorkers() {
605608

606609
jobDefn = JobTestHelper.generateJobDefinition(clusterName, sInfo);
607610
MantisScheduler schedulerMock = mock(MantisScheduler.class);
611+
when(schedulerMock.schedulerHandlesAllocationRetries()).thenReturn(true);
608612
MantisJobStore jobStoreMock = mock(MantisJobStore.class);
609613
MantisJobMetadataImpl mantisJobMetaData = new MantisJobMetadataImpl.Builder()
610614
.withJobId(new JobId(clusterName,2))
@@ -709,6 +713,7 @@ public void testkill() throws Exception {
709713
JobDefinition jobDefn = JobTestHelper.generateJobDefinition(clusterName);
710714

711715
MantisScheduler schedulerMock = mock(MantisScheduler.class);
716+
when(schedulerMock.schedulerHandlesAllocationRetries()).thenReturn(true);
712717
MantisJobStore jobStoreMock = mock(MantisJobStore.class);
713718
MantisJobMetadataImpl mantisJobMetaData = new MantisJobMetadataImpl.Builder()
714719
.withJobId(new JobId(clusterName,3))
@@ -741,7 +746,7 @@ public void testkill() throws Exception {
741746
}
742747

743748
@Test
744-
public void testHeartBeatMissingResubmit() {
749+
public void testNoHeartBeatAfterLaunchResubmit() {
745750
final TestKit probe = new TestKit(system);
746751
String clusterName= "testHeartBeatMissingResubmit";
747752
IJobClusterDefinition jobClusterDefn = JobTestHelper.generateJobClusterDefinition(clusterName);
@@ -753,6 +758,8 @@ public void testHeartBeatMissingResubmit() {
753758
jobDefn = JobTestHelper.generateJobDefinition(clusterName, sInfo);
754759

755760
MantisScheduler schedulerMock = mock(MantisScheduler.class);
761+
when(schedulerMock.schedulerHandlesAllocationRetries()).thenReturn(true);
762+
756763
MantisJobStore jobStoreMock = mock(MantisJobStore.class);
757764
MantisJobMetadataImpl mantisJobMetaData = new MantisJobMetadataImpl.Builder()
758765
.withJobId(new JobId(clusterName,2))
@@ -792,13 +799,95 @@ public void testHeartBeatMissingResubmit() {
792799
// 2 worker have started so job should be started.
793800
assertEquals(JobState.Accepted, resp3.getJobMetadata().get().getState());
794801

795-
JobTestHelper.sendHeartBeat(probe, jobActor, jobId,1, workerId2);
802+
Instant now = Instant.now();
803+
jobActor.tell(new JobProto.CheckHeartBeat(now.plusSeconds(240)), probe.getRef());
804+
Thread.sleep(1000);
805+
806+
// 1 original submissions and 0 resubmits because of worker not in launched state with HB timeouts
807+
verify(schedulerMock, times(1)).scheduleWorkers(any());
808+
// 1 kills due to resubmits
809+
verify(schedulerMock, times(0)).unscheduleAndTerminateWorker(any(), any());
796810

797-
JobTestHelper.sendHeartBeat(probe, jobActor, jobId,1, workerId);
811+
// launch worker but no HB yet
812+
JobTestHelper.sendWorkerLaunchedEvent(probe, jobActor, workerId2, stageNo);
798813

799814
// check hb status in the future where we expect all last HBs to be stale.
800-
Instant now = Instant.now();
815+
now = Instant.now();
801816
jobActor.tell(new JobProto.CheckHeartBeat(now.plusSeconds(240)), probe.getRef());
817+
Thread.sleep(1000);
818+
819+
// job status remain as accepted
820+
jobActor.tell(new JobClusterManagerProto.GetJobDetailsRequest("nj", jobId), probe.getRef());
821+
GetJobDetailsResponse resp4 = probe.expectMsgClass(GetJobDetailsResponse.class);
822+
assertEquals(SUCCESS, resp4.responseCode);
823+
assertEquals(JobState.Accepted, resp4.getJobMetadata().get().getState());
824+
825+
// 1 original submissions and 0 resubmits because of worker not in launched state with HB timeouts
826+
verify(schedulerMock, times(2)).scheduleWorkers(any());
827+
// 1 kills due to resubmits
828+
verify(schedulerMock, times(1)).unscheduleAndTerminateWorker(eq(workerId2), any());
829+
} catch (Exception e) {
830+
fail("unexpected exception " + e.getMessage());
831+
}
832+
}
833+
834+
@Test
835+
public void testHeartBeatPendingSchedulingNoResubmit() {
836+
final TestKit probe = new TestKit(system);
837+
String clusterName= "testHeartBeatMissingResubmit";
838+
IJobClusterDefinition jobClusterDefn = JobTestHelper.generateJobClusterDefinition(clusterName);
839+
840+
JobDefinition jobDefn;
841+
try {
842+
SchedulingInfo sInfo = new SchedulingInfo.Builder().numberOfStages(1).multiWorkerStageWithConstraints(2, new MachineDefinition(1.0,1.0,1.0,3), Lists.newArrayList(), Lists.newArrayList()).build();
843+
844+
jobDefn = JobTestHelper.generateJobDefinition(clusterName, sInfo);
845+
846+
MantisScheduler schedulerMock = mock(MantisScheduler.class);
847+
when(schedulerMock.schedulerHandlesAllocationRetries()).thenReturn(true);
848+
MantisJobStore jobStoreMock = mock(MantisJobStore.class);
849+
MantisJobMetadataImpl mantisJobMetaData = new MantisJobMetadataImpl.Builder()
850+
.withJobId(new JobId(clusterName,2))
851+
.withSubmittedAt(Instant.now())
852+
.withJobState(JobState.Accepted)
853+
854+
.withNextWorkerNumToUse(1)
855+
.withJobDefinition(jobDefn)
856+
.build();
857+
final ActorRef jobActor = system.actorOf(JobActor.props(jobClusterDefn, mantisJobMetaData, jobStoreMock, schedulerMock, eventPublisher, costsCalculator));
858+
859+
jobActor.tell(new JobProto.InitJob(probe.getRef()), probe.getRef());
860+
JobProto.JobInitialized initMsg = probe.expectMsgClass(JobProto.JobInitialized.class);
861+
assertEquals(SUCCESS, initMsg.responseCode);
862+
String jobId = clusterName + "-2";
863+
jobActor.tell(new JobClusterManagerProto.GetJobDetailsRequest("nj", jobId), probe.getRef());
864+
GetJobDetailsResponse resp = probe.expectMsgClass(GetJobDetailsResponse.class);
865+
assertEquals(SUCCESS, resp.responseCode);
866+
assertEquals(JobState.Accepted,resp.getJobMetadata().get().getState());
867+
int stageNo = 1;
868+
869+
WorkerId workerId = new WorkerId(jobId, 0, 1);
870+
// check job status again
871+
jobActor.tell(new JobClusterManagerProto.GetJobDetailsRequest("nj", jobId), probe.getRef());
872+
GetJobDetailsResponse resp2 = probe.expectMsgClass(GetJobDetailsResponse.class);
873+
assertEquals(SUCCESS, resp2.responseCode);
874+
875+
// No worker has started.
876+
assertEquals(JobState.Accepted,resp2.getJobMetadata().get().getState());
877+
WorkerId workerId2 = new WorkerId(jobId, 1, 2);
878+
879+
// check job status again
880+
jobActor.tell(new JobClusterManagerProto.GetJobDetailsRequest("nj", jobId), probe.getRef());
881+
GetJobDetailsResponse resp3 = probe.expectMsgClass(GetJobDetailsResponse.class);
882+
assertEquals(SUCCESS, resp3.responseCode);
883+
884+
// 2 worker have started so job should be started.
885+
assertEquals(JobState.Accepted, resp3.getJobMetadata().get().getState());
886+
887+
// trigger HB check far into the future where no retry on scheduling is expected because the worker has not
888+
// switched into launched state yet.
889+
Instant now = Instant.now();
890+
jobActor.tell(new JobProto.CheckHeartBeat(now.plusSeconds(99999)), probe.getRef());
802891

803892
Thread.sleep(1000);
804893

@@ -807,8 +896,7 @@ public void testHeartBeatMissingResubmit() {
807896
// 0 kills due to resubmits
808897
verify(schedulerMock, times(0)).unscheduleAndTerminateWorker(any(), any());
809898
} catch (Exception e) {
810-
e.printStackTrace();
811-
fail();
899+
fail("unexpected exception " + e.getMessage());
812900
}
813901
}
814902

@@ -825,6 +913,7 @@ public void testHeartBeatEnforcement() {
825913
jobDefn = JobTestHelper.generateJobDefinition(clusterName, sInfo);
826914

827915
MantisScheduler schedulerMock = mock(MantisScheduler.class);
916+
when(schedulerMock.schedulerHandlesAllocationRetries()).thenReturn(true);
828917
MantisJobStore jobStoreMock = mock(MantisJobStore.class);
829918
MantisJobMetadataImpl mantisJobMetaData = new MantisJobMetadataImpl.Builder()
830919
.withJobId(new JobId(clusterName,2))
@@ -920,6 +1009,7 @@ public void testLostWorkerGetsReplaced() {
9201009

9211010
jobDefn = JobTestHelper.generateJobDefinition(clusterName, sInfo);
9221011
MantisScheduler schedulerMock = mock(MantisScheduler.class);
1012+
when(schedulerMock.schedulerHandlesAllocationRetries()).thenReturn(true);
9231013
//MantisJobStore jobStoreMock = mock(MantisJobStore.class);
9241014

9251015
MantisJobStore jobStoreSpied = Mockito.spy(jobStore);

0 commit comments

Comments
 (0)