Skip to content

Commit 28ba9a9

Browse files
authored
Determine job status based on k8s job and fix pointer bug (#1917)
1 parent a47b439 commit 28ba9a9

File tree

2 files changed

+33
-47
lines changed

2 files changed

+33
-47
lines changed

pkg/operator/resources/job/batchapi/cron.go

Lines changed: 26 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,10 @@ func ManageJobResources() error {
9090
return err
9191
}
9292

93-
k8sJobMap := map[string]*kbatch.Job{}
93+
k8sJobMap := map[string]kbatch.Job{}
9494
k8sJobIDSet := strset.Set{}
9595
for _, kJob := range jobs {
96-
k8sJobMap[kJob.Labels["jobID"]] = &kJob
96+
k8sJobMap[kJob.Labels["jobID"]] = kJob
9797
k8sJobIDSet.Add(kJob.Labels["jobID"])
9898
}
9999

@@ -103,7 +103,7 @@ func ManageJobResources() error {
103103
queueURL = pointer.String(queueURLMap[jobKey.ID])
104104
}
105105

106-
k8sJob := k8sJobMap[jobKey.ID]
106+
k8sJob, jobFound := k8sJobMap[jobKey.ID]
107107

108108
jobLogger, err := operator.GetJobLogger(jobKey)
109109
if err != nil {
@@ -135,7 +135,7 @@ func ManageJobResources() error {
135135
continue
136136
}
137137

138-
newStatusCode, msg, err := reconcileInProgressJob(jobState, queueURL, k8sJob)
138+
newStatusCode, msg, err := reconcileInProgressJob(jobState, queueURL, jobFound)
139139
if err != nil {
140140
telemetry.Error(err)
141141
operatorLogger.Error(err)
@@ -150,7 +150,7 @@ func ManageJobResources() error {
150150
continue
151151
}
152152
}
153-
if queueURL == nil || k8sJob == nil {
153+
if queueURL == nil {
154154
// job has been submitted within the grace period, it may take a while for a newly created queues and jobs to show up in list results
155155
continue
156156
}
@@ -249,7 +249,7 @@ func ManageJobResources() error {
249249
}
250250

251251
// verifies that queue exists for an in progress job and k8s job exists for a job in running status, if verification fails return the a job code to reflect the state
252-
func reconcileInProgressJob(jobState *job.State, queueURL *string, k8sJob *kbatch.Job) (status.JobCode, string, error) {
252+
func reconcileInProgressJob(jobState *job.State, queueURL *string, jobFound bool) (status.JobCode, string, error) {
253253
jobKey := jobState.JobKey
254254

255255
if queueURL == nil {
@@ -275,45 +275,49 @@ func reconcileInProgressJob(jobState *job.State, queueURL *string, k8sJob *kbatc
275275
return jobState.Status, "", nil
276276
}
277277

278-
if k8sJob == nil { // unexpected k8s job missing
278+
if !jobFound { // unexpected k8s job missing
279279
return status.JobUnexpectedError, fmt.Sprintf("terminating job %s; unable to find kubernetes job", jobKey.UserString()), nil
280280
}
281281
}
282282

283283
return jobState.Status, "", nil
284284
}
285285

286-
func checkIfJobCompleted(jobState *job.State, queueURL string, k8sJob *kbatch.Job) error {
286+
func checkIfJobCompleted(jobState *job.State, queueURL string, k8sJob kbatch.Job) error {
287287
jobKey := jobState.JobKey
288288

289289
jobFailed, err := checkForJobFailure(jobKey, k8sJob)
290290
if err != nil || jobFailed {
291291
return err
292292
}
293293

294-
queueMessages, err := getQueueMetricsFromURL(queueURL)
294+
jobLogger, err := operator.GetJobLogger(jobKey)
295295
if err != nil {
296296
return err
297297
}
298298

299-
jobLogger, err := operator.GetJobLogger(jobKey)
299+
// job is still in-progress
300+
if int(k8sJob.Status.Active) != 0 {
301+
return nil
302+
}
303+
304+
queueMessages, err := getQueueMetricsFromURL(queueURL)
300305
if err != nil {
301306
return err
302307
}
303308

304309
if !queueMessages.IsEmpty() {
305310
// Give time for queue metrics to reach consistency
306-
if k8sJob != nil && int(k8sJob.Status.Active) == 0 {
307-
if _jobsToDelete.Has(jobKey.ID) {
308-
_jobsToDelete.Remove(jobKey.ID)
309-
jobLogger.Error("unexpected job status because cluster state indicates job has completed but metrics indicate that job is still in progress")
310-
return errors.FirstError(
311-
job.SetUnexpectedErrorStatus(jobKey),
312-
deleteJobRuntimeResources(jobKey),
313-
)
314-
}
315-
_jobsToDelete.Add(jobKey.ID)
311+
if _jobsToDelete.Has(jobKey.ID) {
312+
_jobsToDelete.Remove(jobKey.ID)
313+
jobLogger.Error("unexpected job status because cluster state indicates job has completed but metrics indicate that job is still in progress")
314+
return errors.FirstError(
315+
job.SetUnexpectedErrorStatus(jobKey),
316+
deleteJobRuntimeResources(jobKey),
317+
)
316318
}
319+
_jobsToDelete.Add(jobKey.ID)
320+
317321
return nil
318322
}
319323

@@ -356,7 +360,7 @@ func checkIfJobCompleted(jobState *job.State, queueURL string, k8sJob *kbatch.Jo
356360
return nil
357361
}
358362

359-
func checkForJobFailure(jobKey spec.JobKey, k8sJob *kbatch.Job) (bool, error) {
363+
func checkForJobFailure(jobKey spec.JobKey, k8sJob kbatch.Job) (bool, error) {
360364
jobLogger, err := operator.GetJobLogger(jobKey)
361365
if err != nil {
362366
return false, err
@@ -372,7 +376,7 @@ func checkForJobFailure(jobKey spec.JobKey, k8sJob *kbatch.Job) (bool, error) {
372376
deleteJobRuntimeResources(jobKey),
373377
)
374378
}
375-
if k8sJob != nil && int(k8sJob.Status.Failed) > 0 {
379+
if int(k8sJob.Status.Failed) > 0 {
376380
podStatus := k8s.GetPodStatus(&pod)
377381
for _, containerStatus := range pod.Status.ContainerStatuses {
378382
if containerStatus.LastTerminationState.Terminated != nil {
@@ -394,9 +398,6 @@ func checkForJobFailure(jobKey spec.JobKey, k8sJob *kbatch.Job) (bool, error) {
394398
}
395399
}
396400

397-
if k8sJob == nil {
398-
return false, nil
399-
}
400401
if int(k8sJob.Status.Failed) > 0 {
401402
if !reasonFound {
402403
jobLogger.Error("workers were killed for unknown reason")
@@ -405,12 +406,6 @@ func checkForJobFailure(jobKey spec.JobKey, k8sJob *kbatch.Job) (bool, error) {
405406
job.SetWorkerErrorStatus(jobKey),
406407
deleteJobRuntimeResources(jobKey),
407408
)
408-
} else if int(k8sJob.Status.Active) == 0 && int(k8sJob.Status.Failed) == 0 && len(pods) == 0 {
409-
// really unexpected situation which doesn't hurt if we check
410-
return true, errors.FirstError(
411-
job.SetUnexpectedErrorStatus(jobKey),
412-
deleteJobRuntimeResources(jobKey),
413-
)
414409
}
415410

416411
return false, nil

pkg/operator/resources/job/taskapi/cron.go

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,10 @@ func ManageJobResources() error {
7272
return err
7373
}
7474

75-
k8sJobMap := map[string]*kbatch.Job{}
75+
k8sJobMap := map[string]kbatch.Job{}
7676
k8sJobIDSet := strset.Set{}
7777
for _, kJob := range jobs {
78-
k8sJobMap[kJob.Labels["jobID"]] = &kJob
78+
k8sJobMap[kJob.Labels["jobID"]] = kJob
7979
k8sJobIDSet.Add(kJob.Labels["jobID"])
8080
}
8181

@@ -87,7 +87,7 @@ func ManageJobResources() error {
8787
continue
8888
}
8989

90-
k8sJob := k8sJobMap[jobKey.ID]
90+
k8sJob, jobFound := k8sJobMap[jobKey.ID]
9191

9292
jobState, err := job.GetJobState(jobKey)
9393
if err != nil {
@@ -112,7 +112,7 @@ func ManageJobResources() error {
112112
}
113113

114114
// reconcile job state and k8s job
115-
newStatusCode, msg, err := reconcileInProgressJob(jobState, k8sJob)
115+
newStatusCode, msg, err := reconcileInProgressJob(jobState, jobFound)
116116
if err != nil {
117117
telemetry.Error(err)
118118
operatorLogger.Error(err)
@@ -187,21 +187,21 @@ func ManageJobResources() error {
187187
}
188188

189189
// verifies k8s job exists for a job in running status, if verification fails return a job code to reflect the state
190-
func reconcileInProgressJob(jobState *job.State, k8sJob *kbatch.Job) (status.JobCode, string, error) {
190+
func reconcileInProgressJob(jobState *job.State, jobFound bool) (status.JobCode, string, error) {
191191
if jobState.Status == status.JobRunning {
192192
if time.Now().Sub(jobState.LastUpdatedMap[status.JobRunning.String()]) <= _k8sJobExistenceGracePeriod {
193193
return jobState.Status, "", nil
194194
}
195195

196-
if k8sJob == nil { // unexpected k8s job missing
196+
if !jobFound { // unexpected k8s job missing
197197
return status.JobUnexpectedError, fmt.Sprintf("terminating job %s; unable to find kubernetes job", jobState.JobKey.UserString()), nil
198198
}
199199
}
200200

201201
return jobState.Status, "", nil
202202
}
203203

204-
func checkIfJobCompleted(jobKey spec.JobKey, k8sJob *kbatch.Job) error {
204+
func checkIfJobCompleted(jobKey spec.JobKey, k8sJob kbatch.Job) error {
205205
pods, _ := config.K8s.ListPodsByLabel("jobID", jobKey.ID)
206206
for _, pod := range pods {
207207
if k8s.WasPodOOMKilled(&pod) {
@@ -212,9 +212,6 @@ func checkIfJobCompleted(jobKey spec.JobKey, k8sJob *kbatch.Job) error {
212212
}
213213
}
214214

215-
if k8sJob == nil {
216-
return nil
217-
}
218215
if int(k8sJob.Status.Failed) == 1 {
219216
return errors.FirstError(
220217
job.SetWorkerErrorStatus(jobKey),
@@ -225,12 +222,6 @@ func checkIfJobCompleted(jobKey spec.JobKey, k8sJob *kbatch.Job) error {
225222
job.SetSucceededStatus(jobKey),
226223
deleteJobRuntimeResources(jobKey),
227224
)
228-
} else if int(k8sJob.Status.Active) == 0 && int(k8sJob.Status.Failed) == 0 && len(pods) == 0 {
229-
// really unexpected situation which doesn't hurt if we check
230-
return errors.FirstError(
231-
job.SetUnexpectedErrorStatus(jobKey),
232-
deleteJobRuntimeResources(jobKey),
233-
)
234225
}
235226

236227
return nil

0 commit comments

Comments
 (0)