From db555a13d3dbc26bde5aa64e77ef8b9ac64c11a9 Mon Sep 17 00:00:00 2001 From: vishal Date: Mon, 1 Mar 2021 10:46:18 -0500 Subject: [PATCH 1/2] Determine job status based on k8s job and convert pointers to not pointers --- pkg/operator/resources/job/batchapi/cron.go | 59 ++++++++++----------- pkg/operator/resources/job/taskapi/cron.go | 17 +++--- 2 files changed, 35 insertions(+), 41 deletions(-) diff --git a/pkg/operator/resources/job/batchapi/cron.go b/pkg/operator/resources/job/batchapi/cron.go index 8cfc076648..c392555aa1 100644 --- a/pkg/operator/resources/job/batchapi/cron.go +++ b/pkg/operator/resources/job/batchapi/cron.go @@ -90,10 +90,10 @@ func ManageJobResources() error { return err } - k8sJobMap := map[string]*kbatch.Job{} + k8sJobMap := map[string]kbatch.Job{} k8sJobIDSet := strset.Set{} for _, kJob := range jobs { - k8sJobMap[kJob.Labels["jobID"]] = &kJob + k8sJobMap[kJob.Labels["jobID"]] = kJob k8sJobIDSet.Add(kJob.Labels["jobID"]) } @@ -103,7 +103,7 @@ func ManageJobResources() error { queueURL = pointer.String(queueURLMap[jobKey.ID]) } - k8sJob := k8sJobMap[jobKey.ID] + k8sJob, jobFound := k8sJobMap[jobKey.ID] jobLogger, err := operator.GetJobLogger(jobKey) if err != nil { @@ -135,7 +135,7 @@ func ManageJobResources() error { continue } - newStatusCode, msg, err := reconcileInProgressJob(jobState, queueURL, k8sJob) + newStatusCode, msg, err := reconcileInProgressJob(jobState, queueURL, jobFound) if err != nil { telemetry.Error(err) operatorLogger.Error(err) @@ -150,7 +150,7 @@ func ManageJobResources() error { continue } } - if queueURL == nil || k8sJob == nil { + if queueURL == nil { // job has been submitted within the grace period, it may take a while for a newly created queues and jobs to show up in list results continue } @@ -249,7 +249,7 @@ func ManageJobResources() error { } // verifies that queue exists for an in progress job and k8s job exists for a job in running status, if verification fails return the a job code to reflect the state -func reconcileInProgressJob(jobState *job.State, queueURL *string, k8sJob *kbatch.Job) (status.JobCode, string, error) { +func reconcileInProgressJob(jobState *job.State, queueURL *string, jobFound bool) (status.JobCode, string, error) { jobKey := jobState.JobKey if queueURL == nil { @@ -275,7 +275,7 @@ func reconcileInProgressJob(jobState *job.State, queueURL *string, k8sJob *kbatc return jobState.Status, "", nil } - if k8sJob == nil { // unexpected k8s job missing + if !jobFound { // unexpected k8s job missing return status.JobUnexpectedError, fmt.Sprintf("terminating job %s; unable to find kubernetes job", jobKey.UserString()), nil } } @@ -283,35 +283,41 @@ func reconcileInProgressJob(jobState *job.State, queueURL *string, k8sJob *kbatc return jobState.Status, "", nil } -func checkIfJobCompleted(jobKey spec.JobKey, queueURL string, k8sJob *kbatch.Job) error { +func checkIfJobCompleted(jobState *job.State, queueURL string, k8sJob kbatch.Job) error { + jobKey := jobState.JobKey + jobFailed, err := checkForJobFailure(jobKey, k8sJob) if err != nil || jobFailed { return err } - queueMessages, err := getQueueMetricsFromURL(queueURL) + jobLogger, err := operator.GetJobLogger(jobKey) if err != nil { return err } - jobLogger, err := operator.GetJobLogger(jobKey) + // job is still in-progress + if int(k8sJob.Status.Active) != 0 { + return nil + } + + queueMessages, err := getQueueMetricsFromURL(queueURL) if err != nil { return err } if !queueMessages.IsEmpty() { // Give time for queue metrics to reach consistency - if k8sJob != nil && int(k8sJob.Status.Active) == 0 { - if _jobsToDelete.Has(jobKey.ID) { - _jobsToDelete.Remove(jobKey.ID) - jobLogger.Error("unexpected job status because cluster state indicates job has completed but metrics indicate that job is still in progress") - return errors.FirstError( - job.SetUnexpectedErrorStatus(jobKey), - deleteJobRuntimeResources(jobKey), - ) - } - _jobsToDelete.Add(jobKey.ID) + if _jobsToDelete.Has(jobKey.ID) { + _jobsToDelete.Remove(jobKey.ID) + jobLogger.Error("unexpected job status because cluster state indicates job has completed but metrics indicate that job is still in progress") + return errors.FirstError( + job.SetUnexpectedErrorStatus(jobKey), + deleteJobRuntimeResources(jobKey), + ) } + _jobsToDelete.Add(jobKey.ID) + return nil } @@ -350,7 +356,7 @@ func checkIfJobCompleted(jobKey spec.JobKey, queueURL string, k8sJob *kbatch.Job return nil } -func checkForJobFailure(jobKey spec.JobKey, k8sJob *kbatch.Job) (bool, error) { +func checkForJobFailure(jobKey spec.JobKey, k8sJob kbatch.Job) (bool, error) { jobLogger, err := operator.GetJobLogger(jobKey) if err != nil { return false, err @@ -366,7 +372,7 @@ func checkForJobFailure(jobKey spec.JobKey, k8sJob *kbatch.Job) (bool, error) { deleteJobRuntimeResources(jobKey), ) } - if k8sJob != nil && int(k8sJob.Status.Failed) > 0 { + if int(k8sJob.Status.Failed) > 0 { podStatus := k8s.GetPodStatus(&pod) for _, containerStatus := range pod.Status.ContainerStatuses { if containerStatus.LastTerminationState.Terminated != nil { @@ -388,9 +394,6 @@ func checkForJobFailure(jobKey spec.JobKey, k8sJob *kbatch.Job) (bool, error) { } } - if k8sJob == nil { - return false, nil - } if int(k8sJob.Status.Failed) > 0 { if !reasonFound { jobLogger.Error("workers were killed for unknown reason") @@ -399,12 +402,6 @@ func checkForJobFailure(jobKey spec.JobKey, k8sJob *kbatch.Job) (bool, error) { job.SetWorkerErrorStatus(jobKey), deleteJobRuntimeResources(jobKey), ) - } else if int(k8sJob.Status.Active) == 0 && int(k8sJob.Status.Failed) == 0 && len(pods) == 0 { - // really unexpected situation which doesn't hurt if we check - return true, errors.FirstError( - job.SetUnexpectedErrorStatus(jobKey), - deleteJobRuntimeResources(jobKey), - ) } return false, nil diff --git a/pkg/operator/resources/job/taskapi/cron.go b/pkg/operator/resources/job/taskapi/cron.go index 6fe125e03a..89c14f3783 100644 --- a/pkg/operator/resources/job/taskapi/cron.go +++ b/pkg/operator/resources/job/taskapi/cron.go @@ -72,10 +72,10 @@ func ManageJobResources() error { return err } - k8sJobMap := map[string]*kbatch.Job{} + k8sJobMap := map[string]kbatch.Job{} k8sJobIDSet := strset.Set{} for _, kJob := range jobs { - k8sJobMap[kJob.Labels["jobID"]] = &kJob + k8sJobMap[kJob.Labels["jobID"]] = kJob k8sJobIDSet.Add(kJob.Labels["jobID"]) } @@ -87,7 +87,7 @@ func ManageJobResources() error { continue } - k8sJob := k8sJobMap[jobKey.ID] + k8sJob, jobFound := k8sJobMap[jobKey.ID] jobState, err := job.GetJobState(jobKey) if err != nil { @@ -112,7 +112,7 @@ func ManageJobResources() error { } // reconcile job state and k8s job - newStatusCode, msg, err := reconcileInProgressJob(jobState, k8sJob) + newStatusCode, msg, err := reconcileInProgressJob(jobState, jobFound) if err != nil { telemetry.Error(err) operatorLogger.Error(err) @@ -187,13 +187,13 @@ func ManageJobResources() error { } // verifies k8s job exists for a job in running status, if verification fails return a job code to reflect the state -func reconcileInProgressJob(jobState *job.State, k8sJob *kbatch.Job) (status.JobCode, string, error) { +func reconcileInProgressJob(jobState *job.State, jobFound bool) (status.JobCode, string, error) { if jobState.Status == status.JobRunning { if time.Now().Sub(jobState.LastUpdatedMap[status.JobRunning.String()]) <= _k8sJobExistenceGracePeriod { return jobState.Status, "", nil } - if k8sJob == nil { // unexpected k8s job missing + if !jobFound { // unexpected k8s job missing return status.JobUnexpectedError, fmt.Sprintf("terminating job %s; unable to find kubernetes job", jobState.JobKey.UserString()), nil } } @@ -201,7 +201,7 @@ func reconcileInProgressJob(jobState *job.State, k8sJob *kbatch.Job) (status.Job return jobState.Status, "", nil } -func checkIfJobCompleted(jobKey spec.JobKey, k8sJob *kbatch.Job) error { +func checkIfJobCompleted(jobKey spec.JobKey, k8sJob kbatch.Job) error { pods, _ := config.K8s.ListPodsByLabel("jobID", jobKey.ID) for _, pod := range pods { if k8s.WasPodOOMKilled(&pod) { @@ -212,9 +212,6 @@ func checkIfJobCompleted(jobKey spec.JobKey, k8sJob *kbatch.Job) error { } } - if k8sJob == nil { - return nil - } if int(k8sJob.Status.Failed) == 1 { return errors.FirstError( job.SetWorkerErrorStatus(jobKey), From 8d9c8c2eb19c96f02fb1b6dace7f57229487a3cf Mon Sep 17 00:00:00 2001 From: vishal Date: Mon, 1 Mar 2021 15:35:10 -0500 Subject: [PATCH 2/2] Update cron.go --- pkg/operator/resources/job/taskapi/cron.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pkg/operator/resources/job/taskapi/cron.go b/pkg/operator/resources/job/taskapi/cron.go index 89c14f3783..2ad9f97cd6 100644 --- a/pkg/operator/resources/job/taskapi/cron.go +++ b/pkg/operator/resources/job/taskapi/cron.go @@ -222,12 +222,6 @@ func checkIfJobCompleted(jobKey spec.JobKey, k8sJob kbatch.Job) error { job.SetSucceededStatus(jobKey), deleteJobRuntimeResources(jobKey), ) - } else if int(k8sJob.Status.Active) == 0 && int(k8sJob.Status.Failed) == 0 && len(pods) == 0 { - // really unexpected situation which doesn't hurt if we check - return errors.FirstError( - job.SetUnexpectedErrorStatus(jobKey), - deleteJobRuntimeResources(jobKey), - ) } return nil