diff --git a/pkg/lib/aws/s3.go b/pkg/lib/aws/s3.go index 74f73cb957..b2d856cf20 100644 --- a/pkg/lib/aws/s3.go +++ b/pkg/lib/aws/s3.go @@ -239,6 +239,21 @@ func (c *Client) ReadBytesFromS3(key string) ([]byte, error) { return buf.Bytes(), nil } +func (c *Client) ListPrefix(prefix string, maxResults int64) ([]*s3.Object, error) { + listObjectsInput := &s3.ListObjectsV2Input{ + Bucket: aws.String(c.Bucket), + Prefix: aws.String(prefix), + MaxKeys: aws.Int64(maxResults), + } + + output, err := c.s3Client.ListObjectsV2(listObjectsInput) + if err != nil { + return nil, errors.Wrap(err, prefix) + } + + return output.Contents, nil +} + func (c *Client) DeleteFromS3ByPrefix(prefix string, continueIfFailure bool) error { listObjectsInput := &s3.ListObjectsV2Input{ Bucket: aws.String(c.Bucket), diff --git a/pkg/operator/context/context.go b/pkg/operator/context/context.go index de108f3861..f005a37d2d 100644 --- a/pkg/operator/context/context.go +++ b/pkg/operator/context/context.go @@ -69,6 +69,7 @@ func New( ctx.PythonPackages = pythonPackages apis, err := getAPIs(userconf, ctx.DeploymentVersion, files, pythonPackages) + if err != nil { return nil, err } diff --git a/pkg/operator/workloads/metrics.go b/pkg/operator/workloads/metrics.go index 2ae14174fe..3a7bb9123f 100644 --- a/pkg/operator/workloads/metrics.go +++ b/pkg/operator/workloads/metrics.go @@ -17,7 +17,9 @@ limitations under the License. package workloads import ( + "encoding/base64" "fmt" + "path/filepath" "strings" "time" @@ -62,7 +64,7 @@ func GetMetrics(appName, apiName string) (*schema.APIMetrics, error) { requestList := []func() error{} if realTimeStart.Before(realTimeEnd) { - requestList = append(requestList, getAPIMetricsFunc(appName, api, 1, &realTimeStart, &realTimeEnd, &realTimeMetrics)) + requestList = append(requestList, getAPIMetricsFunc(ctx, api, 1, &realTimeStart, &realTimeEnd, &realTimeMetrics)) } if apiStartTime.Before(realTimeStart) { @@ -75,7 +77,7 @@ func GetMetrics(appName, apiName string) (*schema.APIMetrics, error) { } else { batchStart = twoWeeksAgo } - requestList = append(requestList, getAPIMetricsFunc(appName, api, 60*60, &batchStart, &batchEnd, &batchMetrics)) + requestList = append(requestList, getAPIMetricsFunc(ctx, api, 60*60, &batchStart, &batchEnd, &batchMetrics)) } if len(requestList) != 0 { @@ -89,9 +91,9 @@ func GetMetrics(appName, apiName string) (*schema.APIMetrics, error) { return &mergedMetrics, nil } -func getAPIMetricsFunc(appName string, api *context.API, period int64, startTime *time.Time, endTime *time.Time, apiMetrics *schema.APIMetrics) func() error { +func getAPIMetricsFunc(ctx *context.Context, api *context.API, period int64, startTime *time.Time, endTime *time.Time, apiMetrics *schema.APIMetrics) func() error { return func() error { - metricDataResults, err := queryMetrics(appName, api, period, startTime, endTime) + metricDataResults, err := queryMetrics(ctx, api, period, startTime, endTime) if err != nil { return err } @@ -116,20 +118,20 @@ func getAPIMetricsFunc(appName string, api *context.API, period int64, startTime } } -func queryMetrics(appName string, api *context.API, period int64, startTime *time.Time, endTime *time.Time) ([]*cloudwatch.MetricDataResult, error) { - networkDataQueries := getNetworkStatsDef(appName, api, period) +func queryMetrics(ctx *context.Context, api *context.API, period int64, startTime *time.Time, endTime *time.Time) ([]*cloudwatch.MetricDataResult, error) { + networkDataQueries := getNetworkStatsDef(ctx.App.Name, api, period) latencyMetrics := getLatencyMetricsDef(api.Path, period) allMetrics := append(latencyMetrics, networkDataQueries...) if api.Tracker != nil { if api.Tracker.ModelType == userconfig.ClassificationModelType { - classMetrics, err := getClassesMetricDef(appName, api, period) + classMetrics, err := getClassesMetricDef(ctx, api, period) if err != nil { return nil, err } allMetrics = append(allMetrics, classMetrics...) } else { - regressionMetrics := getRegressionMetricDef(appName, api, period) + regressionMetrics := getRegressionMetricDef(ctx.App.Name, api, period) allMetrics = append(allMetrics, regressionMetrics...) } } @@ -397,50 +399,29 @@ func getNetworkStatsDef(appName string, api *context.API, period int64) []*cloud return networkDataQueries } -func getClassesMetricDef(appName string, api *context.API, period int64) ([]*cloudwatch.MetricDataQuery, error) { - listMetricsInput := &cloudwatch.ListMetricsInput{ - Namespace: aws.String(config.Cortex.LogGroup), - MetricName: aws.String("Prediction"), - Dimensions: []*cloudwatch.DimensionFilter{ - { - Name: aws.String("AppName"), - Value: aws.String(appName), - }, - { - Name: aws.String("APIName"), - Value: aws.String(api.Name), - }, - { - Name: aws.String("APIID"), - Value: aws.String(api.ID), - }, - }, - } - - listMetricsOutput, err := config.AWS.CloudWatchMetrics.ListMetrics(listMetricsInput) +func getClassesMetricDef(ctx *context.Context, api *context.API, period int64) ([]*cloudwatch.MetricDataQuery, error) { + prefix := filepath.Join(ctx.MetadataRoot, api.ID, "classes") + "/" + classes, err := config.AWS.ListPrefix(prefix, int64(consts.MaxClassesPerRequest)) if err != nil { return nil, err } - if listMetricsOutput.Metrics == nil { + if len(classes) == 0 { return nil, nil } classMetricQueries := []*cloudwatch.MetricDataQuery{} - classCount := 0 - for i, metric := range listMetricsOutput.Metrics { - if classCount >= consts.MaxClassesPerRequest { - break - } - - var className string - for _, dim := range metric.Dimensions { - if *dim.Name == "Class" { - className = *dim.Value - } + for i, classObj := range classes { + classKey := *classObj.Key + urlSplit := strings.Split(classKey, "/") + encodedClassName := urlSplit[len(urlSplit)-1] + decodedBytes, err := base64.URLEncoding.DecodeString(encodedClassName) + if err != nil { + return nil, errors.Wrap(err, "encoded class name", encodedClassName) } + className := string(decodedBytes) if len(className) == 0 { continue } @@ -448,13 +429,19 @@ func getClassesMetricDef(appName string, api *context.API, period int64) ([]*clo classMetricQueries = append(classMetricQueries, &cloudwatch.MetricDataQuery{ Id: aws.String(fmt.Sprintf("id_%d", i)), MetricStat: &cloudwatch.MetricStat{ - Metric: metric, + Metric: &cloudwatch.Metric{ + Namespace: aws.String(config.Cortex.LogGroup), + MetricName: aws.String("Prediction"), + Dimensions: append(getAPIDimensions(ctx.App.Name, api), &cloudwatch.Dimension{ + Name: aws.String("Class"), + Value: aws.String(className), + }), + }, Stat: aws.String("Sum"), Period: aws.Int64(period), }, Label: aws.String("class_" + className), }) - classCount++ } return classMetricQueries, nil } diff --git a/pkg/workloads/cortex/lib/api_utils.py b/pkg/workloads/cortex/lib/api_utils.py index c25f61ff2f..cb0c3dea81 100644 --- a/pkg/workloads/cortex/lib/api_utils.py +++ b/pkg/workloads/cortex/lib/api_utils.py @@ -13,12 +13,38 @@ # limitations under the License. +import os +import base64 + from cortex.lib.exceptions import UserException, CortexException from cortex.lib.log import get_logger logger = get_logger() +def get_classes(ctx, api_name): + api = ctx.apis[api_name] + prefix = os.path.join(ctx.metadata_root, api["id"], "classes") + class_paths = ctx.storage.search(prefix=prefix) + class_set = set() + for class_path in class_paths: + encoded_class_name = class_path.split("/")[-1] + class_set.add(base64.urlsafe_b64decode(encoded_class_name.encode()).decode()) + return class_set + + +def upload_class(ctx, api_name, class_name): + api = ctx.apis[api_name] + + try: + ascii_encoded = class_name.encode("ascii") # cloudwatch only supports ascii + encoded_class_name = base64.urlsafe_b64encode(ascii_encoded) + key = os.path.join(ctx.metadata_root, api["id"], "classes", encoded_class_name.decode()) + ctx.storage.put_json("", key) + except Exception as e: + raise ValueError("unable to store class {}".format(class_name)) from e + + def api_metric_dimensions(ctx, api_name): api = ctx.apis[api_name] return [ @@ -42,72 +68,87 @@ def predictions_per_request_metric(dimensions, prediction_count): ] -def prediction_metrics(dimensions, api, predictions): - metric_list = [] +def extract_predicted_values(api, predictions): + predicted_values = [] + tracker = api.get("tracker") for prediction in predictions: predicted_value = prediction.get(tracker["key"]) if predicted_value is None: - logger.warn( + raise ValueError( "failed to track key '{}': not found in response payload".format(tracker["key"]) ) - return [] - if tracker["model_type"] == "classification": - if type(predicted_value) == str or type(predicted_value) == int: - dimensions_with_class = dimensions + [ - {"Name": "Class", "Value": str(predicted_value)} - ] - metric = { - "MetricName": "Prediction", - "Dimensions": dimensions_with_class, - "Unit": "Count", - "Value": 1, - } - - metric_list.append(metric) - else: - logger.warn( + if type(predicted_value) != str and type(predicted_value) != int: + raise ValueError( "failed to track key '{}': expected type 'str' or 'int' but encountered '{}'".format( tracker["key"], type(predicted_value) ) ) - return [] else: - if type(predicted_value) == float or type(predicted_value) == int: # allow ints - metric = { - "MetricName": "Prediction", - "Dimensions": dimensions, - "Value": float(predicted_value), - } - metric_list.append(metric) - else: - logger.warn( + if type(predicted_value) != float and type(predicted_value) != int: # allow ints + raise ValueError( "failed to track key '{}': expected type 'float' or 'int' but encountered '{}'".format( tracker["key"], type(predicted_value) ) ) - return [] + predicted_values.append(predicted_value) + + return predicted_values + + +def prediction_metrics(dimensions, api, predicted_values): + metric_list = [] + tracker = api.get("tracker") + for predicted_value in predicted_values: + if tracker["model_type"] == "classification": + dimensions_with_class = dimensions + [{"Name": "Class", "Value": str(predicted_value)}] + metric = { + "MetricName": "Prediction", + "Dimensions": dimensions_with_class, + "Unit": "Count", + "Value": 1, + } + + metric_list.append(metric) + else: + metric = { + "MetricName": "Prediction", + "Dimensions": dimensions, + "Value": float(predicted_value), + } + metric_list.append(metric) return metric_list -def post_request_metrics(ctx, api, response, predictions): - try: - api_name = api["name"] +def cache_classes(ctx, api, predicted_values, class_set): + for predicted_value in predicted_values: + if predicted_value not in class_set: + upload_class(ctx, api["name"], predicted_value) + class_set.add(predicted_value) - api_dimensions = api_metric_dimensions(ctx, api_name) - metrics_list = [] - metrics_list += status_code_metric(api_dimensions, response.status_code) - if predictions is not None: - metrics_list += predictions_per_request_metric(api_dimensions, len(predictions)) +def post_request_metrics(ctx, api, response, predictions, class_set): + api_name = api["name"] + api_dimensions = api_metric_dimensions(ctx, api_name) + metrics_list = [] + metrics_list += status_code_metric(api_dimensions, response.status_code) - if api.get("tracker") is not None: - metrics_list += prediction_metrics(api_dimensions, api, predictions) - ctx.publish_metrics(metrics_list) + if predictions is not None: + metrics_list += predictions_per_request_metric(api_dimensions, len(predictions)) - except CortexException as e: - e.wrap("error") - logger.warn(str(e), exc_info=True) + if api.get("tracker") is not None: + try: + predicted_values = extract_predicted_values(api, predictions) + + if api["tracker"]["model_type"] == "classification": + cache_classes(ctx, api, predicted_values, class_set) + + metrics_list += prediction_metrics(api_dimensions, api, predicted_values) + except Exception as e: + logger.warn(str(e), exc_info=True) + + try: + ctx.publish_metrics(metrics_list) except Exception as e: logger.warn(str(e), exc_info=True) diff --git a/pkg/workloads/cortex/lib/context.py b/pkg/workloads/cortex/lib/context.py index d072281661..dda42f1c6f 100644 --- a/pkg/workloads/cortex/lib/context.py +++ b/pkg/workloads/cortex/lib/context.py @@ -56,6 +56,7 @@ def __init__(self, **kwargs): self.id = self.ctx["id"] self.key = self.ctx["key"] + self.metadata_root = self.ctx["metadata_root"] self.cortex_config = self.ctx["cortex_config"] self.deployment_version = self.ctx["deployment_version"] self.root = self.ctx["root"] @@ -83,9 +84,6 @@ def __init__(self, **kwargs): ) ) - # Internal caches - self._metadatas = {} - # This affects Tensorflow S3 access os.environ["AWS_REGION"] = self.cortex_config.get("region", "") @@ -192,24 +190,6 @@ def upload_resource_status_end(self, exit_code, *resources): def resource_status_key(self, resource): return os.path.join(self.status_prefix, resource["id"], resource["workload_id"]) - def get_metadata_url(self, resource_id): - return os.path.join(self.ctx["metadata_root"], resource_id + ".json") - - def write_metadata(self, resource_id, metadata): - if resource_id in self._metadatas and self._metadatas[resource_id] == metadata: - return - - self._metadatas[resource_id] = metadata - self.storage.put_json(metadata, self.get_metadata_url(resource_id)) - - def get_metadata(self, resource_id, use_cache=True): - if use_cache and resource_id in self._metadatas: - return self._metadatas[resource_id] - - metadata = self.storage.get_json(self.get_metadata_url(resource_id), allow_missing=True) - self._metadatas[resource_id] = metadata - return metadata - def publish_metrics(self, metrics): if self.monitoring is None: raise CortexException("monitoring client not initialized") # unexpected diff --git a/pkg/workloads/cortex/lib/storage/local.py b/pkg/workloads/cortex/lib/storage/local.py index 346798ad84..15df92a6c6 100644 --- a/pkg/workloads/cortex/lib/storage/local.py +++ b/pkg/workloads/cortex/lib/storage/local.py @@ -70,9 +70,15 @@ def search(self, prefix="", suffix=""): files.append(filename) return files - def put_json(self, obj, key): + def _put_str(self, str_val, key): f = self._get_or_create_path(key) - f.write_text(json.dumps(obj)) + f.write_text(str_val) + + def put_str(self, str_val, key): + self._put_str(str_val, key) + + def put_json(self, obj, key): + self._put_str(json.dumps(obj), key) def get_json(self, key, allow_missing=False, num_retries=0, retry_delay_sec=2): f = self._get_path_if_exists( diff --git a/pkg/workloads/cortex/lib/storage/s3.py b/pkg/workloads/cortex/lib/storage/s3.py index 919edc2813..95e930c999 100644 --- a/pkg/workloads/cortex/lib/storage/s3.py +++ b/pkg/workloads/cortex/lib/storage/s3.py @@ -140,6 +140,9 @@ def _read_bytes_from_s3_single(self, key, allow_missing=False, ext_bucket=None): def search(self, prefix="", suffix=""): return list(self._get_matching_s3_keys_generator(prefix, suffix)) + def put_str(self, str_val, key): + self._upload_string_to_s3(str_val, key) + def put_json(self, obj, key): self._upload_string_to_s3(json.dumps(obj), key) diff --git a/pkg/workloads/cortex/onnx_serve/api.py b/pkg/workloads/cortex/onnx_serve/api.py index f929fc227b..e0aefe45e3 100644 --- a/pkg/workloads/cortex/onnx_serve/api.py +++ b/pkg/workloads/cortex/onnx_serve/api.py @@ -58,6 +58,7 @@ "input_metadata": None, "output_metadata": None, "request_handler": None, + "class_set": set(), } @@ -74,7 +75,7 @@ def after_request(response): predictions = None if "predictions" in g: predictions = g.predictions - api_utils.post_request_metrics(ctx, api, response, predictions) + api_utils.post_request_metrics(ctx, api, response, predictions, local_cache["class_set"]) return response @@ -261,6 +262,12 @@ def start(args): ) sys.exit(1) + if api.get("tracker") is not None and api["tracker"].get("model_type") == "classification": + try: + local_cache["class_set"] = api_utils.get_classes(ctx, api["name"]) + except Exception as e: + logger.warn("An error occurred while attempting to load classes", exc_info=True) + serve(app, listen="*:{}".format(args.port)) diff --git a/pkg/workloads/cortex/tf_api/api.py b/pkg/workloads/cortex/tf_api/api.py index bfa3815254..1cf925bc6b 100644 --- a/pkg/workloads/cortex/tf_api/api.py +++ b/pkg/workloads/cortex/tf_api/api.py @@ -38,7 +38,16 @@ app = Flask(__name__) app.json_encoder = util.json_tricks_encoder -local_cache = {"ctx": None, "stub": None, "api": None, "metadata": None, "request_handler": None} + +local_cache = { + "ctx": None, + "stub": None, + "api": None, + "metadata": None, + "request_handler": None, + "class_set": set(), +} + DTYPE_TO_VALUE_KEY = { "DT_INT32": "intVal", @@ -92,7 +101,7 @@ def after_request(response): predictions = None if "predictions" in g: predictions = g.predictions - api_utils.post_request_metrics(ctx, api, response, predictions) + api_utils.post_request_metrics(ctx, api, response, predictions, local_cache["class_set"]) return response @@ -344,6 +353,12 @@ def start(args): logger.exception(e) sys.exit(1) + if api.get("tracker") is not None and api["tracker"].get("model_type") == "classification": + try: + local_cache["class_set"] = api_utils.get_classes(ctx, api["name"]) + except Exception as e: + logger.warn("An error occurred while attempting to load classes", exc_info=True) + channel = grpc.insecure_channel("localhost:" + str(args.tf_serve_port)) local_cache["stub"] = prediction_service_pb2_grpc.PredictionServiceStub(channel)