From aaa6791192c00678f138e1e801176fb346ebf320 Mon Sep 17 00:00:00 2001 From: vishal Date: Wed, 21 Aug 2019 18:19:34 -0400 Subject: [PATCH 1/6] Use S3 as a metadata store for class names --- pkg/lib/aws/s3.go | 15 +++ pkg/operator/api/context/apis.go | 1 + pkg/operator/context/apis.go | 2 + pkg/operator/context/context.go | 2 +- pkg/operator/workloads/metrics.go | 55 ++++------- pkg/workloads/cortex/lib/api_utils.py | 129 ++++++++++++++++--------- pkg/workloads/cortex/onnx_serve/api.py | 9 +- pkg/workloads/cortex/tf_api/api.py | 9 +- 8 files changed, 141 insertions(+), 81 deletions(-) diff --git a/pkg/lib/aws/s3.go b/pkg/lib/aws/s3.go index fd15da4db9..4088a81148 100644 --- a/pkg/lib/aws/s3.go +++ b/pkg/lib/aws/s3.go @@ -196,6 +196,21 @@ func (c *Client) ReadBytesFromS3(key string) ([]byte, error) { return buf.Bytes(), nil } +func (c *Client) ListPrefix(prefix string, maxResults int64) ([]*s3.Object, error) { + listObjectsInput := &s3.ListObjectsV2Input{ + Bucket: aws.String(c.Bucket), + Prefix: aws.String(prefix), + MaxKeys: aws.Int64(maxResults), + } + + output, err := c.s3Client.ListObjectsV2(listObjectsInput) + if err != nil { + return nil, errors.Wrap(err, prefix) + } + + return output.Contents, nil +} + func (c *Client) DeleteFromS3ByPrefix(prefix string, continueIfFailure bool) error { listObjectsInput := &s3.ListObjectsV2Input{ Bucket: aws.String(c.Bucket), diff --git a/pkg/operator/api/context/apis.go b/pkg/operator/api/context/apis.go index 2f4d47c074..7320f7b4fb 100644 --- a/pkg/operator/api/context/apis.go +++ b/pkg/operator/api/context/apis.go @@ -27,6 +27,7 @@ type API struct { *ComputedResourceFields Path string `json:"path"` RequestHandlerImplKey *string `json:"request_handler_impl_key"` + MetadataKey string `json:"metadata_key"` } func APIPath(apiName string, appName string) string { diff --git a/pkg/operator/context/apis.go b/pkg/operator/context/apis.go index 35a3bb7c8a..b12c9404c7 100644 --- a/pkg/operator/context/apis.go +++ b/pkg/operator/context/apis.go @@ -37,6 +37,7 @@ var uploadedRequestHandlers = strset.New() func getAPIs(config *userconfig.Config, models context.Models, + metadataRoot string, datasetVersion string, impls map[string][]byte, pythonPackages context.PythonPackages, @@ -95,6 +96,7 @@ func getAPIs(config *userconfig.Config, API: apiConfig, Path: context.APIPath(apiConfig.Name, config.App.Name), RequestHandlerImplKey: requestHandlerImplKey, + MetadataKey: filepath.Join(metadataRoot, id), } } return apis, nil diff --git a/pkg/operator/context/context.go b/pkg/operator/context/context.go index c867b24e7c..d10397c5df 100644 --- a/pkg/operator/context/context.go +++ b/pkg/operator/context/context.go @@ -233,7 +233,7 @@ func New( } ctx.Models = models - apis, err := getAPIs(userconf, ctx.Models, ctx.DatasetVersion, files, pythonPackages) + apis, err := getAPIs(userconf, ctx.Models, ctx.MetadataRoot, ctx.DatasetVersion, files, pythonPackages) if err != nil { return nil, err } diff --git a/pkg/operator/workloads/metrics.go b/pkg/operator/workloads/metrics.go index 2ae14174fe..af5408d1f0 100644 --- a/pkg/operator/workloads/metrics.go +++ b/pkg/operator/workloads/metrics.go @@ -17,7 +17,9 @@ limitations under the License. package workloads import ( + "encoding/base64" "fmt" + "path/filepath" "strings" "time" @@ -398,49 +400,28 @@ func getNetworkStatsDef(appName string, api *context.API, period int64) []*cloud } func getClassesMetricDef(appName string, api *context.API, period int64) ([]*cloudwatch.MetricDataQuery, error) { - listMetricsInput := &cloudwatch.ListMetricsInput{ - Namespace: aws.String(config.Cortex.LogGroup), - MetricName: aws.String("Prediction"), - Dimensions: []*cloudwatch.DimensionFilter{ - { - Name: aws.String("AppName"), - Value: aws.String(appName), - }, - { - Name: aws.String("APIName"), - Value: aws.String(api.Name), - }, - { - Name: aws.String("APIID"), - Value: aws.String(api.ID), - }, - }, - } - - listMetricsOutput, err := config.AWS.CloudWatchMetrics.ListMetrics(listMetricsInput) + prefix := filepath.Join(api.MetadataKey, "classes") + classes, err := config.AWS.ListPrefix(prefix, int64(consts.MaxClassesPerRequest)) if err != nil { return nil, err } - if listMetricsOutput.Metrics == nil { + if len(classes) == 0 { return nil, nil } classMetricQueries := []*cloudwatch.MetricDataQuery{} - classCount := 0 - for i, metric := range listMetricsOutput.Metrics { - if classCount >= consts.MaxClassesPerRequest { - break - } - - var className string - for _, dim := range metric.Dimensions { - if *dim.Name == "Class" { - className = *dim.Value - } + for i, classObj := range classes { + classKey := *classObj.Key + urlSplit := strings.Split(classKey, "/") + encodedClassName := urlSplit[len(urlSplit)-1] + decodedBytes, err := base64.URLEncoding.DecodeString(encodedClassName) + if err != nil { + return nil, errors.Wrap(err, "encoded class name", encodedClassName) } + className := string(decodedBytes) if len(className) == 0 { continue } @@ -448,13 +429,19 @@ func getClassesMetricDef(appName string, api *context.API, period int64) ([]*clo classMetricQueries = append(classMetricQueries, &cloudwatch.MetricDataQuery{ Id: aws.String(fmt.Sprintf("id_%d", i)), MetricStat: &cloudwatch.MetricStat{ - Metric: metric, + Metric: &cloudwatch.Metric{ + Namespace: aws.String(config.Cortex.LogGroup), + MetricName: aws.String("Prediction"), + Dimensions: append(getAPIDimensions(appName, api), &cloudwatch.Dimension{ + Name: aws.String("Class"), + Value: aws.String(className), + }), + }, Stat: aws.String("Sum"), Period: aws.Int64(period), }, Label: aws.String("class_" + className), }) - classCount++ } return classMetricQueries, nil } diff --git a/pkg/workloads/cortex/lib/api_utils.py b/pkg/workloads/cortex/lib/api_utils.py index c25f61ff2f..ab5fb7bc4f 100644 --- a/pkg/workloads/cortex/lib/api_utils.py +++ b/pkg/workloads/cortex/lib/api_utils.py @@ -13,12 +13,38 @@ # limitations under the License. +import os +import base64 + from cortex.lib.exceptions import UserException, CortexException from cortex.lib.log import get_logger logger = get_logger() +def get_classes(ctx, api_name): + api = ctx.apis[api_name] + prefix = os.path.join(api["metadata_key"], "classes") + class_paths = ctx.storage.search(prefix=prefix) + class_set = set() + for class_path in class_paths: + encoded_class_name = class_path.split("/")[-1] + class_set.add(base64.urlsafe_b64decode(encoded_class_name.encode()).decode()) + return class_set + + +def upload_class(ctx, api_name, class_name): + api = ctx.apis[api_name] + + try: + ascii_encoded = class_name.encode("ascii") # cloudwatch only supports ascii + encoded_class_name = base64.urlsafe_b64encode(ascii_encoded) + key = os.path.join(api["metadata_key"], "classes", encoded_class_name.decode()) + ctx.storage.put_json("", key) + except Exception as e: + raise ValueError("unable to store class {}".format(class_name)) from e + + def api_metric_dimensions(ctx, api_name): api = ctx.apis[api_name] return [ @@ -42,72 +68,87 @@ def predictions_per_request_metric(dimensions, prediction_count): ] -def prediction_metrics(dimensions, api, predictions): - metric_list = [] +def extract_predicted_values(api, predictions): + predicted_values = [] + tracker = api.get("tracker") for prediction in predictions: predicted_value = prediction.get(tracker["key"]) if predicted_value is None: - logger.warn( + raise ValueError( "failed to track key '{}': not found in response payload".format(tracker["key"]) ) - return [] - if tracker["model_type"] == "classification": - if type(predicted_value) == str or type(predicted_value) == int: - dimensions_with_class = dimensions + [ - {"Name": "Class", "Value": str(predicted_value)} - ] - metric = { - "MetricName": "Prediction", - "Dimensions": dimensions_with_class, - "Unit": "Count", - "Value": 1, - } - - metric_list.append(metric) - else: - logger.warn( + if type(predicted_value) != str and type(predicted_value) != int: + raise ValueError( "failed to track key '{}': expected type 'str' or 'int' but encountered '{}'".format( tracker["key"], type(predicted_value) ) ) - return [] else: - if type(predicted_value) == float or type(predicted_value) == int: # allow ints - metric = { - "MetricName": "Prediction", - "Dimensions": dimensions, - "Value": float(predicted_value), - } - metric_list.append(metric) - else: - logger.warn( + if type(predicted_value) != float and type(predicted_value) != int: # allow ints + raise ValueError( "failed to track key '{}': expected type 'float' or 'int' but encountered '{}'".format( tracker["key"], type(predicted_value) ) ) - return [] + predicted_values.append(predicted_value) + + return predicted_values + + +def prediction_metrics(dimensions, api, predicted_values): + metric_list = [] + tracker = api.get("tracker") + for predicted_value in predicted_values: + if tracker["model_type"] == "classification": + dimensions_with_class = dimensions + [{"Name": "Class", "Value": str(predicted_value)}] + metric = { + "MetricName": "Prediction", + "Dimensions": dimensions_with_class, + "Unit": "Count", + "Value": 1, + } + + metric_list.append(metric) + else: + metric = { + "MetricName": "Prediction", + "Dimensions": dimensions, + "Value": float(predicted_value), + } + metric_list.append(metric) return metric_list -def post_request_metrics(ctx, api, response, predictions): - try: - api_name = api["name"] +def cache_classes(ctx, api, predicted_values, class_set): + for predicted_value in predicted_values: + if predicted_value not in class_set: + upload_class(ctx, api["name"], predicted_value) + class_set.add(predicted_value) - api_dimensions = api_metric_dimensions(ctx, api_name) - metrics_list = [] - metrics_list += status_code_metric(api_dimensions, response.status_code) - if predictions is not None: - metrics_list += predictions_per_request_metric(api_dimensions, len(predictions)) +def post_request_metrics(ctx, api, response, predictions, class_set): + api_name = api["name"] + api_dimensions = api_metric_dimensions(ctx, api_name) + metrics_list = [] + metrics_list += status_code_metric(api_dimensions, response.status_code) - if api.get("tracker") is not None: - metrics_list += prediction_metrics(api_dimensions, api, predictions) - ctx.publish_metrics(metrics_list) + if predictions is not None: + metrics_list += predictions_per_request_metric(api_dimensions, len(predictions)) - except CortexException as e: - e.wrap("error") - logger.warn(str(e), exc_info=True) + if api.get("tracker") is not None: + try: + predicted_values = extract_predicted_values(api, predictions) + + if api["tracker"]["model_type"] == "classification": + cache_classes(ctx, api, predicted_values, class_set) + + metrics_list += prediction_metrics(api_dimensions, api, predicted_values) + except Exception as e: + logger.warn(str(e), exc_info=True) + + try: + ctx.publish_metrics(metrics_list) except Exception as e: logger.warn(str(e), exc_info=True) diff --git a/pkg/workloads/cortex/onnx_serve/api.py b/pkg/workloads/cortex/onnx_serve/api.py index b8183465c8..db62e83367 100644 --- a/pkg/workloads/cortex/onnx_serve/api.py +++ b/pkg/workloads/cortex/onnx_serve/api.py @@ -63,6 +63,7 @@ "input_metadata": None, "output_metadata": None, "request_handler": None, + "class_set": set(), } @@ -79,7 +80,7 @@ def after_request(response): predictions = None if "predictions" in g: predictions = g.predictions - api_utils.post_request_metrics(ctx, api, response, predictions) + api_utils.post_request_metrics(ctx, api, response, predictions, local_cache["class_set"]) return response @@ -265,6 +266,12 @@ def start(args): ) sys.exit(1) + if api.get("tracker") is not None and api["tracker"].get("model_type") == "classification": + try: + local_cache["class_set"] = api_utils.get_classes(ctx, api["name"]) + except Exception as e: + logger.warn("An error occurred while attempting to load classes", exc_info=True) + serve(app, listen="*:{}".format(args.port)) diff --git a/pkg/workloads/cortex/tf_api/api.py b/pkg/workloads/cortex/tf_api/api.py index c3783ea6db..256af9e421 100644 --- a/pkg/workloads/cortex/tf_api/api.py +++ b/pkg/workloads/cortex/tf_api/api.py @@ -53,6 +53,7 @@ "required_inputs": None, "metadata": None, "target_vocab_populated": None, + "class_set": set(), } DTYPE_TO_VALUE_KEY = { @@ -107,7 +108,7 @@ def after_request(response): predictions = None if "predictions" in g: predictions = g.predictions - api_utils.post_request_metrics(ctx, api, response, predictions) + api_utils.post_request_metrics(ctx, api, response, predictions, local_cache["class_set"]) return response @@ -577,6 +578,12 @@ def start(args): logger.exception(e) sys.exit(1) + if api.get("tracker") is not None and api["tracker"].get("model_type") == "classification": + try: + local_cache["class_set"] = api_utils.get_classes(ctx, api["name"]) + except Exception as e: + logger.warn("An error occurred while attempting to load classes", exc_info=True) + channel = grpc.insecure_channel("localhost:" + str(args.tf_serve_port)) local_cache["stub"] = prediction_service_pb2_grpc.PredictionServiceStub(channel) From 086b74a98e70ad9333d9e449dabf82b131cc933e Mon Sep 17 00:00:00 2001 From: vishal Date: Wed, 21 Aug 2019 18:27:03 -0400 Subject: [PATCH 2/6] Fix linting --- pkg/operator/context/apis.go | 1 - pkg/workloads/cortex/tf_api/api.py | 9 ++++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/operator/context/apis.go b/pkg/operator/context/apis.go index f8bc0e8de0..832ac1de2c 100644 --- a/pkg/operator/context/apis.go +++ b/pkg/operator/context/apis.go @@ -36,7 +36,6 @@ var uploadedRequestHandlers = strset.New() func getAPIs(config *userconfig.Config, metadataRoot string, - datasetVersion string, deploymentVersion string, impls map[string][]byte, pythonPackages context.PythonPackages, diff --git a/pkg/workloads/cortex/tf_api/api.py b/pkg/workloads/cortex/tf_api/api.py index f4923a14eb..103c378310 100644 --- a/pkg/workloads/cortex/tf_api/api.py +++ b/pkg/workloads/cortex/tf_api/api.py @@ -39,7 +39,14 @@ app.json_encoder = util.json_tricks_encoder -local_cache = {"ctx": None, "stub": None, "api": None, "metadata": None, "request_handler": None, "class_set": set()} +local_cache = { + "ctx": None, + "stub": None, + "api": None, + "metadata": None, + "request_handler": None, + "class_set": set(), +} DTYPE_TO_VALUE_KEY = { From 637c4c2ea8dca865fbfc512d5c52ba0c23eeba76 Mon Sep 17 00:00:00 2001 From: vishal Date: Thu, 22 Aug 2019 12:47:58 +0000 Subject: [PATCH 3/6] Add put_str to storage functions --- pkg/workloads/cortex/lib/storage/local.py | 10 ++++++++-- pkg/workloads/cortex/lib/storage/s3.py | 3 +++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/pkg/workloads/cortex/lib/storage/local.py b/pkg/workloads/cortex/lib/storage/local.py index 346798ad84..15df92a6c6 100644 --- a/pkg/workloads/cortex/lib/storage/local.py +++ b/pkg/workloads/cortex/lib/storage/local.py @@ -70,9 +70,15 @@ def search(self, prefix="", suffix=""): files.append(filename) return files - def put_json(self, obj, key): + def _put_str(self, str_val, key): f = self._get_or_create_path(key) - f.write_text(json.dumps(obj)) + f.write_text(str_val) + + def put_str(self, str_val, key): + self._put_str(str_val, key) + + def put_json(self, obj, key): + self._put_str(json.dumps(obj), key) def get_json(self, key, allow_missing=False, num_retries=0, retry_delay_sec=2): f = self._get_path_if_exists( diff --git a/pkg/workloads/cortex/lib/storage/s3.py b/pkg/workloads/cortex/lib/storage/s3.py index ed7b37a55a..b72ec1a0e5 100644 --- a/pkg/workloads/cortex/lib/storage/s3.py +++ b/pkg/workloads/cortex/lib/storage/s3.py @@ -140,6 +140,9 @@ def _read_bytes_from_s3_single(self, key, allow_missing=False, ext_bucket=None): def search(self, prefix="", suffix=""): return list(self._get_matching_s3_keys_generator(prefix, suffix)) + def put_str(self, str_val, key): + self._upload_string_to_s3(str_val, key) + def put_json(self, obj, key): self._upload_string_to_s3(json.dumps(obj), key) From 07d3534b54bf2ddadf300b0ff9ade6dff45049b7 Mon Sep 17 00:00:00 2001 From: vishal Date: Thu, 22 Aug 2019 20:10:40 -0400 Subject: [PATCH 4/6] Remove old metadata functions and derive metadata path from context metadata_root --- pkg/operator/api/context/apis.go | 1 - pkg/operator/context/apis.go | 2 -- pkg/operator/context/context.go | 2 +- pkg/operator/workloads/metrics.go | 22 +++++++++++----------- pkg/workloads/cortex/lib/api_utils.py | 4 ++-- pkg/workloads/cortex/lib/context.py | 22 +--------------------- 6 files changed, 15 insertions(+), 38 deletions(-) diff --git a/pkg/operator/api/context/apis.go b/pkg/operator/api/context/apis.go index 7320f7b4fb..2f4d47c074 100644 --- a/pkg/operator/api/context/apis.go +++ b/pkg/operator/api/context/apis.go @@ -27,7 +27,6 @@ type API struct { *ComputedResourceFields Path string `json:"path"` RequestHandlerImplKey *string `json:"request_handler_impl_key"` - MetadataKey string `json:"metadata_key"` } func APIPath(apiName string, appName string) string { diff --git a/pkg/operator/context/apis.go b/pkg/operator/context/apis.go index 124481725c..16fea5cfe5 100644 --- a/pkg/operator/context/apis.go +++ b/pkg/operator/context/apis.go @@ -36,7 +36,6 @@ import ( var uploadedRequestHandlers = strset.New() func getAPIs(config *userconfig.Config, - metadataRoot string, deploymentVersion string, impls map[string][]byte, pythonPackages context.PythonPackages, @@ -85,7 +84,6 @@ func getAPIs(config *userconfig.Config, API: apiConfig, Path: context.APIPath(apiConfig.Name, config.App.Name), RequestHandlerImplKey: requestHandlerImplKey, - MetadataKey: filepath.Join(metadataRoot, id), } } return apis, nil diff --git a/pkg/operator/context/context.go b/pkg/operator/context/context.go index b05f334067..f005a37d2d 100644 --- a/pkg/operator/context/context.go +++ b/pkg/operator/context/context.go @@ -68,7 +68,7 @@ func New( } ctx.PythonPackages = pythonPackages - apis, err := getAPIs(userconf, ctx.MetadataRoot, ctx.DeploymentVersion, files, pythonPackages) + apis, err := getAPIs(userconf, ctx.DeploymentVersion, files, pythonPackages) if err != nil { return nil, err diff --git a/pkg/operator/workloads/metrics.go b/pkg/operator/workloads/metrics.go index af5408d1f0..8ee7922e08 100644 --- a/pkg/operator/workloads/metrics.go +++ b/pkg/operator/workloads/metrics.go @@ -64,7 +64,7 @@ func GetMetrics(appName, apiName string) (*schema.APIMetrics, error) { requestList := []func() error{} if realTimeStart.Before(realTimeEnd) { - requestList = append(requestList, getAPIMetricsFunc(appName, api, 1, &realTimeStart, &realTimeEnd, &realTimeMetrics)) + requestList = append(requestList, getAPIMetricsFunc(ctx, api, 1, &realTimeStart, &realTimeEnd, &realTimeMetrics)) } if apiStartTime.Before(realTimeStart) { @@ -77,7 +77,7 @@ func GetMetrics(appName, apiName string) (*schema.APIMetrics, error) { } else { batchStart = twoWeeksAgo } - requestList = append(requestList, getAPIMetricsFunc(appName, api, 60*60, &batchStart, &batchEnd, &batchMetrics)) + requestList = append(requestList, getAPIMetricsFunc(ctx, api, 60*60, &batchStart, &batchEnd, &batchMetrics)) } if len(requestList) != 0 { @@ -91,9 +91,9 @@ func GetMetrics(appName, apiName string) (*schema.APIMetrics, error) { return &mergedMetrics, nil } -func getAPIMetricsFunc(appName string, api *context.API, period int64, startTime *time.Time, endTime *time.Time, apiMetrics *schema.APIMetrics) func() error { +func getAPIMetricsFunc(ctx *context.Context, api *context.API, period int64, startTime *time.Time, endTime *time.Time, apiMetrics *schema.APIMetrics) func() error { return func() error { - metricDataResults, err := queryMetrics(appName, api, period, startTime, endTime) + metricDataResults, err := queryMetrics(ctx, api, period, startTime, endTime) if err != nil { return err } @@ -118,20 +118,20 @@ func getAPIMetricsFunc(appName string, api *context.API, period int64, startTime } } -func queryMetrics(appName string, api *context.API, period int64, startTime *time.Time, endTime *time.Time) ([]*cloudwatch.MetricDataResult, error) { - networkDataQueries := getNetworkStatsDef(appName, api, period) +func queryMetrics(ctx *context.Context, api *context.API, period int64, startTime *time.Time, endTime *time.Time) ([]*cloudwatch.MetricDataResult, error) { + networkDataQueries := getNetworkStatsDef(ctx.App.Name, api, period) latencyMetrics := getLatencyMetricsDef(api.Path, period) allMetrics := append(latencyMetrics, networkDataQueries...) if api.Tracker != nil { if api.Tracker.ModelType == userconfig.ClassificationModelType { - classMetrics, err := getClassesMetricDef(appName, api, period) + classMetrics, err := getClassesMetricDef(ctx, api, period) if err != nil { return nil, err } allMetrics = append(allMetrics, classMetrics...) } else { - regressionMetrics := getRegressionMetricDef(appName, api, period) + regressionMetrics := getRegressionMetricDef(ctx.App.Name, api, period) allMetrics = append(allMetrics, regressionMetrics...) } } @@ -399,8 +399,8 @@ func getNetworkStatsDef(appName string, api *context.API, period int64) []*cloud return networkDataQueries } -func getClassesMetricDef(appName string, api *context.API, period int64) ([]*cloudwatch.MetricDataQuery, error) { - prefix := filepath.Join(api.MetadataKey, "classes") +func getClassesMetricDef(ctx *context.Context, api *context.API, period int64) ([]*cloudwatch.MetricDataQuery, error) { + prefix := filepath.Join(ctx.MetadataRoot, api.ID, "classes") classes, err := config.AWS.ListPrefix(prefix, int64(consts.MaxClassesPerRequest)) if err != nil { return nil, err @@ -432,7 +432,7 @@ func getClassesMetricDef(appName string, api *context.API, period int64) ([]*clo Metric: &cloudwatch.Metric{ Namespace: aws.String(config.Cortex.LogGroup), MetricName: aws.String("Prediction"), - Dimensions: append(getAPIDimensions(appName, api), &cloudwatch.Dimension{ + Dimensions: append(getAPIDimensions(ctx.App.Name, api), &cloudwatch.Dimension{ Name: aws.String("Class"), Value: aws.String(className), }), diff --git a/pkg/workloads/cortex/lib/api_utils.py b/pkg/workloads/cortex/lib/api_utils.py index ab5fb7bc4f..cb0c3dea81 100644 --- a/pkg/workloads/cortex/lib/api_utils.py +++ b/pkg/workloads/cortex/lib/api_utils.py @@ -24,7 +24,7 @@ def get_classes(ctx, api_name): api = ctx.apis[api_name] - prefix = os.path.join(api["metadata_key"], "classes") + prefix = os.path.join(ctx.metadata_root, api["id"], "classes") class_paths = ctx.storage.search(prefix=prefix) class_set = set() for class_path in class_paths: @@ -39,7 +39,7 @@ def upload_class(ctx, api_name, class_name): try: ascii_encoded = class_name.encode("ascii") # cloudwatch only supports ascii encoded_class_name = base64.urlsafe_b64encode(ascii_encoded) - key = os.path.join(api["metadata_key"], "classes", encoded_class_name.decode()) + key = os.path.join(ctx.metadata_root, api["id"], "classes", encoded_class_name.decode()) ctx.storage.put_json("", key) except Exception as e: raise ValueError("unable to store class {}".format(class_name)) from e diff --git a/pkg/workloads/cortex/lib/context.py b/pkg/workloads/cortex/lib/context.py index d072281661..dda42f1c6f 100644 --- a/pkg/workloads/cortex/lib/context.py +++ b/pkg/workloads/cortex/lib/context.py @@ -56,6 +56,7 @@ def __init__(self, **kwargs): self.id = self.ctx["id"] self.key = self.ctx["key"] + self.metadata_root = self.ctx["metadata_root"] self.cortex_config = self.ctx["cortex_config"] self.deployment_version = self.ctx["deployment_version"] self.root = self.ctx["root"] @@ -83,9 +84,6 @@ def __init__(self, **kwargs): ) ) - # Internal caches - self._metadatas = {} - # This affects Tensorflow S3 access os.environ["AWS_REGION"] = self.cortex_config.get("region", "") @@ -192,24 +190,6 @@ def upload_resource_status_end(self, exit_code, *resources): def resource_status_key(self, resource): return os.path.join(self.status_prefix, resource["id"], resource["workload_id"]) - def get_metadata_url(self, resource_id): - return os.path.join(self.ctx["metadata_root"], resource_id + ".json") - - def write_metadata(self, resource_id, metadata): - if resource_id in self._metadatas and self._metadatas[resource_id] == metadata: - return - - self._metadatas[resource_id] = metadata - self.storage.put_json(metadata, self.get_metadata_url(resource_id)) - - def get_metadata(self, resource_id, use_cache=True): - if use_cache and resource_id in self._metadatas: - return self._metadatas[resource_id] - - metadata = self.storage.get_json(self.get_metadata_url(resource_id), allow_missing=True) - self._metadatas[resource_id] = metadata - return metadata - def publish_metrics(self, metrics): if self.monitoring is None: raise CortexException("monitoring client not initialized") # unexpected From fa92d26842b2a77c8af0e5367abec9b7e4b60c64 Mon Sep 17 00:00:00 2001 From: vishal Date: Fri, 23 Aug 2019 09:59:16 -0400 Subject: [PATCH 5/6] Add trailing slash --- pkg/operator/workloads/metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/operator/workloads/metrics.go b/pkg/operator/workloads/metrics.go index 8ee7922e08..f450945547 100644 --- a/pkg/operator/workloads/metrics.go +++ b/pkg/operator/workloads/metrics.go @@ -400,7 +400,7 @@ func getNetworkStatsDef(appName string, api *context.API, period int64) []*cloud } func getClassesMetricDef(ctx *context.Context, api *context.API, period int64) ([]*cloudwatch.MetricDataQuery, error) { - prefix := filepath.Join(ctx.MetadataRoot, api.ID, "classes") + prefix := filepath.Join(ctx.MetadataRoot, api.ID, "classes/") classes, err := config.AWS.ListPrefix(prefix, int64(consts.MaxClassesPerRequest)) if err != nil { return nil, err From 9f2d9b8fe73e5ac813fbfd2c1e6c377954fb90ad Mon Sep 17 00:00:00 2001 From: vishal Date: Fri, 23 Aug 2019 16:29:48 -0400 Subject: [PATCH 6/6] Add trailing to prefix correctly --- pkg/operator/workloads/metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/operator/workloads/metrics.go b/pkg/operator/workloads/metrics.go index f450945547..3a7bb9123f 100644 --- a/pkg/operator/workloads/metrics.go +++ b/pkg/operator/workloads/metrics.go @@ -400,7 +400,7 @@ func getNetworkStatsDef(appName string, api *context.API, period int64) []*cloud } func getClassesMetricDef(ctx *context.Context, api *context.API, period int64) ([]*cloudwatch.MetricDataQuery, error) { - prefix := filepath.Join(ctx.MetadataRoot, api.ID, "classes/") + prefix := filepath.Join(ctx.MetadataRoot, api.ID, "classes") + "/" classes, err := config.AWS.ListPrefix(prefix, int64(consts.MaxClassesPerRequest)) if err != nil { return nil, err