From db973aa0103fbf0e3db9195bab9d543a04fae519 Mon Sep 17 00:00:00 2001 From: vishal Date: Fri, 5 Mar 2021 10:42:09 -0500 Subject: [PATCH 1/2] Save metrics to cloud to preserve metrics history --- pkg/operator/resources/job/batchapi/job.go | 2 ++ .../resources/job/batchapi/job_status.go | 4 +-- .../resources/job/batchapi/metrics.go | 27 +++++++++++++++++++ 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/pkg/operator/resources/job/batchapi/job.go b/pkg/operator/resources/job/batchapi/job.go index 9b80eecc5a..8525206dca 100644 --- a/pkg/operator/resources/job/batchapi/job.go +++ b/pkg/operator/resources/job/batchapi/job.go @@ -220,10 +220,12 @@ func handleJobSubmissionError(jobKey spec.JobKey, jobErr error) { } } +// delete k8s job, queue and save batch metrics from prometheus to cloud func deleteJobRuntimeResources(jobKey spec.JobKey) error { err := errors.FirstError( deleteK8sJob(jobKey), deleteQueueByJobKeyIfExists(jobKey), + saveMetricsToCloud(jobKey), ) if err != nil { diff --git a/pkg/operator/resources/job/batchapi/job_status.go b/pkg/operator/resources/job/batchapi/job_status.go index d96787d42e..5f5f163766 100644 --- a/pkg/operator/resources/job/batchapi/job_status.go +++ b/pkg/operator/resources/job/batchapi/job_status.go @@ -88,8 +88,8 @@ func getJobStatusFromJobState(jobState *job.State, k8sJob *kbatch.Job, pods []kc } } - if jobState.Status.IsCompleted() && jobState.EndTime != nil { - metrics, err := getBatchMetrics(jobKey, *jobState.EndTime) + if _, ok := jobState.LastUpdatedMap[_completedMetrics]; ok && jobState.Status.IsCompleted() { + metrics, err := readMetricsFromCloud(jobKey) if err != nil { return nil, err } diff --git a/pkg/operator/resources/job/batchapi/metrics.go b/pkg/operator/resources/job/batchapi/metrics.go index 76e132cda7..107d844637 100644 --- a/pkg/operator/resources/job/batchapi/metrics.go +++ b/pkg/operator/resources/job/batchapi/metrics.go @@ -19,6 +19,7 @@ package batchapi import ( "context" "fmt" + "path" "time" "github.com/cortexlabs/cortex/pkg/lib/errors" @@ -32,6 +33,7 @@ import ( const ( _metricsRequestTimeoutSeconds = 10 + _completedMetricsFileKey = "metrics.json" ) func getBatchMetrics(jobKey spec.JobKey, t time.Time) (metrics.BatchMetrics, error) { @@ -143,3 +145,28 @@ func queryPrometheusVec(promAPIv1 promv1.API, query string, t time.Time) (model. return values, nil } + +func saveMetricsToCloud(jobKey spec.JobKey) error { + t := time.Now() + batchMetrics, err := getBatchMetrics(jobKey, t) + if err != nil { + return err + } + + s3Key := path.Join(jobKey.Prefix(config.ClusterName()), _completedMetricsFileKey) + err = config.UploadJSONToBucket(batchMetrics, s3Key) + if err != nil { + return err + } + return nil +} + +func readMetricsFromCloud(jobKey spec.JobKey) (metrics.BatchMetrics, error) { + s3Key := path.Join(jobKey.Prefix(config.ClusterName()), _completedMetricsFileKey) + batchMetrics := metrics.BatchMetrics{} + err := config.ReadJSONFromBucket(&batchMetrics, s3Key) + if err != nil { + return batchMetrics, err + } + return batchMetrics, nil +} From a2d14bb525b98a7e6665061b05bb6a14b454f7e4 Mon Sep 17 00:00:00 2001 From: vishal Date: Fri, 5 Mar 2021 11:50:30 -0500 Subject: [PATCH 2/2] Update job_status.go --- pkg/operator/resources/job/batchapi/job_status.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/operator/resources/job/batchapi/job_status.go b/pkg/operator/resources/job/batchapi/job_status.go index 5f5f163766..a058a12943 100644 --- a/pkg/operator/resources/job/batchapi/job_status.go +++ b/pkg/operator/resources/job/batchapi/job_status.go @@ -88,7 +88,7 @@ func getJobStatusFromJobState(jobState *job.State, k8sJob *kbatch.Job, pods []kc } } - if _, ok := jobState.LastUpdatedMap[_completedMetrics]; ok && jobState.Status.IsCompleted() { + if _, ok := jobState.LastUpdatedMap[_completedMetricsFileKey]; ok && jobState.Status.IsCompleted() { metrics, err := readMetricsFromCloud(jobKey) if err != nil { return nil, err