diff --git a/pkg/operator/resources/job/batchapi/job.go b/pkg/operator/resources/job/batchapi/job.go index 9b80eecc5a..8525206dca 100644 --- a/pkg/operator/resources/job/batchapi/job.go +++ b/pkg/operator/resources/job/batchapi/job.go @@ -220,10 +220,12 @@ func handleJobSubmissionError(jobKey spec.JobKey, jobErr error) { } } +// delete k8s job, queue and save batch metrics from prometheus to cloud func deleteJobRuntimeResources(jobKey spec.JobKey) error { err := errors.FirstError( deleteK8sJob(jobKey), deleteQueueByJobKeyIfExists(jobKey), + saveMetricsToCloud(jobKey), ) if err != nil { diff --git a/pkg/operator/resources/job/batchapi/job_status.go b/pkg/operator/resources/job/batchapi/job_status.go index d96787d42e..a058a12943 100644 --- a/pkg/operator/resources/job/batchapi/job_status.go +++ b/pkg/operator/resources/job/batchapi/job_status.go @@ -88,8 +88,8 @@ func getJobStatusFromJobState(jobState *job.State, k8sJob *kbatch.Job, pods []kc } } - if jobState.Status.IsCompleted() && jobState.EndTime != nil { - metrics, err := getBatchMetrics(jobKey, *jobState.EndTime) + if _, ok := jobState.LastUpdatedMap[_completedMetricsFileKey]; ok && jobState.Status.IsCompleted() { + metrics, err := readMetricsFromCloud(jobKey) if err != nil { return nil, err } diff --git a/pkg/operator/resources/job/batchapi/metrics.go b/pkg/operator/resources/job/batchapi/metrics.go index 76e132cda7..107d844637 100644 --- a/pkg/operator/resources/job/batchapi/metrics.go +++ b/pkg/operator/resources/job/batchapi/metrics.go @@ -19,6 +19,7 @@ package batchapi import ( "context" "fmt" + "path" "time" "github.com/cortexlabs/cortex/pkg/lib/errors" @@ -32,6 +33,7 @@ import ( const ( _metricsRequestTimeoutSeconds = 10 + _completedMetricsFileKey = "metrics.json" ) func getBatchMetrics(jobKey spec.JobKey, t time.Time) (metrics.BatchMetrics, error) { @@ -143,3 +145,28 @@ func queryPrometheusVec(promAPIv1 promv1.API, query string, t time.Time) (model. return values, nil } + +func saveMetricsToCloud(jobKey spec.JobKey) error { + t := time.Now() + batchMetrics, err := getBatchMetrics(jobKey, t) + if err != nil { + return err + } + + s3Key := path.Join(jobKey.Prefix(config.ClusterName()), _completedMetricsFileKey) + err = config.UploadJSONToBucket(batchMetrics, s3Key) + if err != nil { + return err + } + return nil +} + +func readMetricsFromCloud(jobKey spec.JobKey) (metrics.BatchMetrics, error) { + s3Key := path.Join(jobKey.Prefix(config.ClusterName()), _completedMetricsFileKey) + batchMetrics := metrics.BatchMetrics{} + err := config.ReadJSONFromBucket(&batchMetrics, s3Key) + if err != nil { + return batchMetrics, err + } + return batchMetrics, nil +}