From 197e3337cc4d69790a415759e7ac98a349fce106 Mon Sep 17 00:00:00 2001 From: ivan Date: Fri, 23 Aug 2019 12:40:34 -0400 Subject: [PATCH 01/14] make tf paths easier --- pkg/lib/aws/aws.go | 4 +-- pkg/lib/aws/s3.go | 14 ++++----- pkg/operator/api/userconfig/apis.go | 41 +++++++++++++++++++++++++-- pkg/operator/api/userconfig/errors.go | 4 +-- pkg/workloads/cortex/tf_api/api.py | 10 +++++-- 5 files changed, 57 insertions(+), 16 deletions(-) diff --git a/pkg/lib/aws/aws.go b/pkg/lib/aws/aws.go index fa3ac4e76d..9dda79bfd4 100644 --- a/pkg/lib/aws/aws.go +++ b/pkg/lib/aws/aws.go @@ -31,7 +31,7 @@ import ( type Client struct { Region string Bucket string - s3Client *s3.S3 + S3 *s3.S3 stsClient *sts.STS cloudWatchLogsClient *cloudwatchlogs.CloudWatchLogs CloudWatchMetrics *cloudwatch.CloudWatch @@ -48,7 +48,7 @@ func New(region string, bucket string, withAccountID bool) (*Client, error) { awsClient := &Client{ Bucket: bucket, Region: region, - s3Client: s3.New(sess), + S3: s3.New(sess), stsClient: sts.New(sess), CloudWatchMetrics: cloudwatch.New(sess), cloudWatchLogsClient: cloudwatchlogs.New(sess), diff --git a/pkg/lib/aws/s3.go b/pkg/lib/aws/s3.go index 74f73cb957..85163eabeb 100644 --- a/pkg/lib/aws/s3.go +++ b/pkg/lib/aws/s3.go @@ -70,7 +70,7 @@ func S3PathJoin(paths ...string) string { func (c *Client) IsS3File(keys ...string) (bool, error) { for _, key := range keys { - _, err := c.s3Client.HeadObject(&s3.HeadObjectInput{ + _, err := c.S3.HeadObject(&s3.HeadObjectInput{ Bucket: aws.String(c.Bucket), Key: aws.String(key), }) @@ -88,7 +88,7 @@ func (c *Client) IsS3File(keys ...string) (bool, error) { func (c *Client) IsS3Prefix(prefixes ...string) (bool, error) { for _, prefix := range prefixes { - out, err := c.s3Client.ListObjectsV2(&s3.ListObjectsV2Input{ + out, err := c.S3.ListObjectsV2(&s3.ListObjectsV2Input{ Bucket: aws.String(c.Bucket), Prefix: aws.String(prefix), }) @@ -138,7 +138,7 @@ func (c *Client) IsS3PathDir(s3Paths ...string) (bool, error) { } func (c *Client) UploadBytesToS3(data []byte, key string) error { - _, err := c.s3Client.PutObject(&s3.PutObjectInput{ + _, err := c.S3.PutObject(&s3.PutObjectInput{ Body: bytes.NewReader(data), Key: aws.String(key), Bucket: aws.String(c.Bucket), @@ -210,7 +210,7 @@ func (c *Client) ReadMsgpackFromS3(objPtr interface{}, key string) error { } func (c *Client) ReadStringFromS3(key string) (string, error) { - response, err := c.s3Client.GetObject(&s3.GetObjectInput{ + response, err := c.S3.GetObject(&s3.GetObjectInput{ Key: aws.String(key), Bucket: aws.String(c.Bucket), }) @@ -225,7 +225,7 @@ func (c *Client) ReadStringFromS3(key string) (string, error) { } func (c *Client) ReadBytesFromS3(key string) ([]byte, error) { - response, err := c.s3Client.GetObject(&s3.GetObjectInput{ + response, err := c.S3.GetObject(&s3.GetObjectInput{ Key: aws.String(key), Bucket: aws.String(c.Bucket), }) @@ -248,7 +248,7 @@ func (c *Client) DeleteFromS3ByPrefix(prefix string, continueIfFailure bool) err var subErr error - err := c.s3Client.ListObjectsV2Pages(listObjectsInput, + err := c.S3.ListObjectsV2Pages(listObjectsInput, func(listObjectsOutput *s3.ListObjectsV2Output, lastPage bool) bool { deleteObjects := make([]*s3.ObjectIdentifier, len(listObjectsOutput.Contents)) for i, object := range listObjectsOutput.Contents { @@ -261,7 +261,7 @@ func (c *Client) DeleteFromS3ByPrefix(prefix string, continueIfFailure bool) err Quiet: aws.Bool(true), }, } - _, newSubErr := c.s3Client.DeleteObjects(deleteObjectsInput) + _, newSubErr := c.S3.DeleteObjects(deleteObjectsInput) if newSubErr != nil { subErr = newSubErr if !continueIfFailure { diff --git a/pkg/operator/api/userconfig/apis.go b/pkg/operator/api/userconfig/apis.go index 86c1e47228..284d6efb06 100644 --- a/pkg/operator/api/userconfig/apis.go +++ b/pkg/operator/api/userconfig/apis.go @@ -20,6 +20,7 @@ import ( "fmt" "strings" + "github.com/aws/aws-sdk-go/service/s3" "github.com/cortexlabs/cortex/pkg/lib/aws" cr "github.com/cortexlabs/cortex/pkg/lib/configreader" "github.com/cortexlabs/cortex/pkg/lib/errors" @@ -129,6 +130,33 @@ func IsValidTensorFlowS3Directory(path string, awsClient *aws.Client) bool { return true } +func GetTFServingExportFromS3Path(path string, awsClient *aws.Client) (string, error) { + if IsValidTensorFlowS3Directory(path, awsClient) { + return "", nil + } + + bucket, prefix, err := aws.SplitS3Path(path) + if err != nil { + return "", err + } + + possiblePaths := make([]string, 0) + resp, _ := awsClient.S3.ListObjects(&s3.ListObjectsInput{ + Bucket: &bucket, + Prefix: &prefix, + }) + for _, key := range resp.Contents { + keyParts := strings.Split(*key.Key, "/") + possiblePath := bucket + "/" + strings.Join(keyParts[:len(keyParts)-1], "/") + if keyParts[len(keyParts)-1] == "saved_model.pb" && + IsValidTensorFlowS3Directory(possiblePath, awsClient) { + possiblePaths = append(possiblePaths, possiblePath) + } + } + + return possiblePaths[0], nil +} + func (api *API) UserConfigStr() string { var sb strings.Builder sb.WriteString(api.ResourceFields.UserConfigStr()) @@ -178,7 +206,8 @@ func (api *API) Validate() error { return errors.Wrap(ErrorExternalNotFound(api.Model), Identify(api), ModelKey) } case TensorFlowModelFormat: - if !IsValidTensorFlowS3Directory(api.Model, awsClient) { + path, err := GetTFServingExportFromS3Path(api.Model, awsClient) + if path == "" || err != nil { return errors.Wrap(ErrorInvalidTensorflowDir(api.Model), Identify(api), ModelKey) } default: @@ -191,7 +220,15 @@ func (api *API) Validate() error { case IsValidTensorFlowS3Directory(api.Model, awsClient): api.ModelFormat = TensorFlowModelFormat default: - return errors.Wrap(ErrorUnableToInferModelFormat(api.Model), Identify(api)) + path, err := GetTFServingExportFromS3Path(api.Model, awsClient) + if err != nil { + return errors.Wrap(err, Identify(api), ModelKey) + } + if path == "" { + return errors.Wrap(ErrorUnableToInferModelFormat(api.Model), Identify(api)) + } + api.ModelFormat = TensorFlowModelFormat + api.Model = path } } diff --git a/pkg/operator/api/userconfig/errors.go b/pkg/operator/api/userconfig/errors.go index d7f1c5900d..2bd278798b 100644 --- a/pkg/operator/api/userconfig/errors.go +++ b/pkg/operator/api/userconfig/errors.go @@ -284,8 +284,8 @@ func ErrorExternalNotFound(path string) error { var onnxExpectedStructMessage = `For ONNX models, the path should end in .onnx` -var tfExpectedStructMessage = `For TensorFlow models, the path should be a directory with the following structure: - 1523423423/ (version prefix, usually a timestamp) +var tfExpectedStructMessage = `For TensorFlow models, the path contain a directory with the following structure: + 1523423423/ (Version prefix, usually a timestamp. If this is not present we'll make this 1 for you) ├── saved_model.pb └── variables/ ├── variables.index diff --git a/pkg/workloads/cortex/tf_api/api.py b/pkg/workloads/cortex/tf_api/api.py index bfa3815254..63e92b970d 100644 --- a/pkg/workloads/cortex/tf_api/api.py +++ b/pkg/workloads/cortex/tf_api/api.py @@ -314,12 +314,16 @@ def start(args): local_cache["api"] = api local_cache["ctx"] = ctx - if not os.path.isdir(args.model_dir): + if args.only_download: bucket_name, prefix = ctx.storage.deconstruct_s3_path(api["model"]) s3_client = S3(bucket_name, client_config={}) s3_client.download_dir(prefix, args.model_dir) - - if args.only_download: + folder = os.listdir(args.model_dir)[0] + if not folder.isdigit(): + cur_dir = os.path.join(args.model_dir, folder) + rename_dir = os.path.join(args.model_dir, "1") + util.logger.info("{} is not a servable version directory, will rename to {}".format(cur_dir, rename_dir)) + os.rename(cur_dir, rename_dir) return if api.get("request_handler") is not None: From dfde0c4c909fde0befe3ec76d217334f80c74414 Mon Sep 17 00:00:00 2001 From: ivan Date: Fri, 23 Aug 2019 12:44:03 -0400 Subject: [PATCH 02/14] format --- pkg/workloads/cortex/tf_api/api.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/workloads/cortex/tf_api/api.py b/pkg/workloads/cortex/tf_api/api.py index 63e92b970d..66ae266c4e 100644 --- a/pkg/workloads/cortex/tf_api/api.py +++ b/pkg/workloads/cortex/tf_api/api.py @@ -322,7 +322,11 @@ def start(args): if not folder.isdigit(): cur_dir = os.path.join(args.model_dir, folder) rename_dir = os.path.join(args.model_dir, "1") - util.logger.info("{} is not a servable version directory, will rename to {}".format(cur_dir, rename_dir)) + util.logger.info( + "{} is not a servable version directory, will rename to {}".format( + cur_dir, rename_dir + ) + ) os.rename(cur_dir, rename_dir) return From b7201fd08f752607737f3627b43ab2362d47c99e Mon Sep 17 00:00:00 2001 From: Ivan Zhang Date: Mon, 26 Aug 2019 09:07:32 -0400 Subject: [PATCH 03/14] add model download container --- Makefile | 2 + cortex.sh | 2 + dev/registry.sh | 2 + docs/cluster/config.md | 1 + docs/cluster/development.md | 1 + images/model-download/Dockerfile | 23 +++++++ manager/install_cortex.sh | 1 + pkg/operator/config/config.go | 2 + pkg/operator/workloads/api_workload.go | 5 +- .../cortex/model_download/download.py | 68 +++++++++++++++++++ pkg/workloads/cortex/tf_api/api.py | 22 ------ 11 files changed, 103 insertions(+), 26 deletions(-) create mode 100644 images/model-download/Dockerfile create mode 100644 pkg/workloads/cortex/model_download/download.py diff --git a/Makefile b/Makefile index 8581b1ef79..4c6167f877 100644 --- a/Makefile +++ b/Makefile @@ -155,6 +155,7 @@ ci-build-images: @./build/build-image.sh images/istio-pilot istio-pilot @./build/build-image.sh images/istio-proxy istio-proxy @./build/build-image.sh images/istio-mixer istio-mixer + @./build/build-image.sh images/model-download model-download ci-push-images: @./build/push-image.sh manager @@ -174,6 +175,7 @@ ci-push-images: @./build/push-image.sh istio-pilot @./build/push-image.sh istio-proxy @./build/push-image.sh istio-mixer + @./build/push-image.sh model-download ci-build-cli: diff --git a/cortex.sh b/cortex.sh index 04627dda9e..845942833f 100755 --- a/cortex.sh +++ b/cortex.sh @@ -146,6 +146,7 @@ export CORTEX_IMAGE_ISTIO_GALLEY="${CORTEX_IMAGE_ISTIO_GALLEY:-cortexlabs/istio- export CORTEX_IMAGE_ISTIO_PILOT="${CORTEX_IMAGE_ISTIO_PILOT:-cortexlabs/istio-pilot:$CORTEX_VERSION_STABLE}" export CORTEX_IMAGE_ISTIO_PROXY="${CORTEX_IMAGE_ISTIO_PROXY:-cortexlabs/istio-proxy:$CORTEX_VERSION_STABLE}" export CORTEX_IMAGE_ISTIO_MIXER="${CORTEX_IMAGE_ISTIO_MIXER:-cortexlabs/istio-mixer:$CORTEX_VERSION_STABLE}" +export CORTEX_IMAGE_MODEL_DOWNLOAD="${CORTEX_IMAGE_MODEL_DOWNLOAD:-cortexlabs/model-download:$CORTEX_VERSION_STABLE}" export CORTEX_ENABLE_TELEMETRY="${CORTEX_ENABLE_TELEMETRY:-""}" @@ -205,6 +206,7 @@ function install_cortex() { -e CORTEX_IMAGE_ISTIO_PILOT=$CORTEX_IMAGE_ISTIO_PILOT \ -e CORTEX_IMAGE_ISTIO_PROXY=$CORTEX_IMAGE_ISTIO_PROXY \ -e CORTEX_IMAGE_ISTIO_MIXER=$CORTEX_IMAGE_ISTIO_MIXER \ + -e CORTEX_IMAGE_MODEL_DOWNLOAD=$CORTEX_IMAGE_MODEL_DOWNLOAD \ -e CORTEX_ENABLE_TELEMETRY=$CORTEX_ENABLE_TELEMETRY \ $CORTEX_IMAGE_MANAGER } diff --git a/dev/registry.sh b/dev/registry.sh index d17226dd0e..e5253055e5 100755 --- a/dev/registry.sh +++ b/dev/registry.sh @@ -52,6 +52,7 @@ function create_registry() { aws ecr create-repository --repository-name=cortexlabs/cluster-autoscaler --region=$REGISTRY_REGION || true aws ecr create-repository --repository-name=cortexlabs/nvidia --region=$REGISTRY_REGION || true aws ecr create-repository --repository-name=cortexlabs/metrics-server --region=$REGISTRY_REGION || true + aws ecr create-repository --repository-name=cortexlabs/model-download --region=$REGISTRY_REGION || true } ### HELPERS ### @@ -141,6 +142,7 @@ elif [ "$cmd" = "update" ]; then fi build_and_push $ROOT/images/tf-api tf-api latest + build_and_push $ROOT/images/model-download model-download latest build_and_push $ROOT/images/onnx-serve onnx-serve latest cleanup diff --git a/docs/cluster/config.md b/docs/cluster/config.md index a5494c4043..c4bb650b63 100644 --- a/docs/cluster/config.md +++ b/docs/cluster/config.md @@ -60,6 +60,7 @@ export CORTEX_IMAGE_ISTIO_MIXER="cortexlabs/istio-mixer:master" export CORTEX_IMAGE_ISTIO_PILOT="cortexlabs/istio-pilot:master" export CORTEX_IMAGE_ISTIO_CITADEL="cortexlabs/istio-citadel:master" export CORTEX_IMAGE_ISTIO_GALLEY="cortexlabs/istio-galley:master" +export CORTEX_IMAGE_MODEL_DOWNLOAD="cortexlabs/model-download:master" # Flag to enable collecting error reports and usage stats. If flag is not set to either "true" or "false", you will be prompted. export CORTEX_ENABLE_TELEMETRY="" diff --git a/docs/cluster/development.md b/docs/cluster/development.md index 4a31f6b8ee..725ca05daa 100644 --- a/docs/cluster/development.md +++ b/docs/cluster/development.md @@ -78,6 +78,7 @@ export CORTEX_IMAGE_ISTIO_MIXER="XXXXXXXX.dkr.ecr.us-west-2.amazonaws.com/cortex export CORTEX_IMAGE_ISTIO_PILOT="XXXXXXXX.dkr.ecr.us-west-2.amazonaws.com/cortexlabs/istio-pilot:latest" export CORTEX_IMAGE_ISTIO_CITADEL="XXXXXXXX.dkr.ecr.us-west-2.amazonaws.com/cortexlabs/istio-citadel:latest" export CORTEX_IMAGE_ISTIO_GALLEY="XXXXXXXX.dkr.ecr.us-west-2.amazonaws.com/cortexlabs/istio-galley:latest" +export CORTEX_IMAGE_MODEL_DOWNLOAD="XXXXXXXX.dkr.ecr.us-west-2.amazonaws.com/cortexlabs/model-download:latest" export CORTEX_ENABLE_TELEMETRY="false" ``` diff --git a/images/model-download/Dockerfile b/images/model-download/Dockerfile new file mode 100644 index 0000000000..5e3e5d5d00 --- /dev/null +++ b/images/model-download/Dockerfile @@ -0,0 +1,23 @@ +FROM ubuntu:16.04 + +ENV PYTHONPATH="/src:${PYTHONPATH}" + +RUN apt-get update -qq && apt-get install -y -q \ + python3 \ + python3-dev \ + python3-pip \ + && apt-get clean -qq && rm -rf /var/lib/apt/lists/* && \ + pip3 install --upgrade \ + pip \ + setuptools \ + && rm -rf /root/.cache/pip* + +COPY pkg/workloads/cortex/lib/requirements.txt /src/cortex/lib/requirements.txt +RUN pip3 install -r /src/cortex/lib/requirements.txt && \ + rm -rf /root/.cache/pip* + +COPY pkg/workloads/cortex/consts.py /src/cortex/ +COPY pkg/workloads/cortex/lib /src/cortex/lib +COPY pkg/workloads/cortex/model_download /src/cortex/model_download + +ENTRYPOINT ["/usr/bin/python3", "/src/cortex/model_download/download.py"] diff --git a/manager/install_cortex.sh b/manager/install_cortex.sh index bd8d93c422..0375e7bc72 100755 --- a/manager/install_cortex.sh +++ b/manager/install_cortex.sh @@ -64,6 +64,7 @@ function setup_configmap() { --from-literal='IMAGE_ONNX_SERVE'=$CORTEX_IMAGE_ONNX_SERVE \ --from-literal='IMAGE_ONNX_SERVE_GPU'=$CORTEX_IMAGE_ONNX_SERVE_GPU \ --from-literal='IMAGE_TF_API'=$CORTEX_IMAGE_TF_API \ + --from-literal='IMAGE_MODEL_DOWNLOAD'=$CORTEX_IMAGE_MODEL_DOWNLOAD \ --from-literal='IMAGE_PYTHON_PACKAGER'=$CORTEX_IMAGE_PYTHON_PACKAGER \ --from-literal='IMAGE_TF_SERVE_GPU'=$CORTEX_IMAGE_TF_SERVE_GPU \ --from-literal='ENABLE_TELEMETRY'=$CORTEX_ENABLE_TELEMETRY \ diff --git a/pkg/operator/config/config.go b/pkg/operator/config/config.go index c18cc8a98c..9af612d81c 100644 --- a/pkg/operator/config/config.go +++ b/pkg/operator/config/config.go @@ -46,6 +46,7 @@ type CortexConfig struct { OperatorImage string `json:"operator_image"` TFServeImage string `json:"tf_serve_image"` TFAPIImage string `json:"tf_api_image"` + ModelDownloadImage string `json:"model_download_image"` PythonPackagerImage string `json:"python_packager_image"` TFServeImageGPU string `json:"tf_serve_image_gpu"` ONNXServeImage string `json:"onnx_serve_image"` @@ -66,6 +67,7 @@ func Init() error { OperatorImage: getStr("IMAGE_OPERATOR"), TFServeImage: getStr("IMAGE_TF_SERVE"), TFAPIImage: getStr("IMAGE_TF_API"), + ModelDownloadImage: getStr("IMAGE_MODEL_DOWNLOAD"), PythonPackagerImage: getStr("IMAGE_PYTHON_PACKAGER"), TFServeImageGPU: getStr("IMAGE_TF_SERVE_GPU"), ONNXServeImage: getStr("IMAGE_ONNX_SERVE"), diff --git a/pkg/operator/workloads/api_workload.go b/pkg/operator/workloads/api_workload.go index caf9b1f473..010a2d7ff7 100644 --- a/pkg/operator/workloads/api_workload.go +++ b/pkg/operator/workloads/api_workload.go @@ -283,17 +283,14 @@ func tfAPISpec( InitContainers: []kcore.Container{ { Name: modelDownloadInitContainerName, - Image: config.Cortex.TFAPIImage, + Image: config.Cortex.ModelDownloadImage, ImagePullPolicy: "Always", Args: []string{ "--workload-id=" + workloadID, - "--port=" + defaultPortStr, - "--tf-serve-port=" + tfServingPortStr, "--context=" + config.AWS.S3Path(ctx.Key), "--api=" + ctx.APIs[api.Name].ID, "--model-dir=" + path.Join(consts.EmptyDirMountPath, "model"), "--cache-dir=" + consts.ContextCacheDir, - "--only-download=true", }, Env: k8s.AWSCredentials(), VolumeMounts: k8s.DefaultVolumeMounts(), diff --git a/pkg/workloads/cortex/model_download/download.py b/pkg/workloads/cortex/model_download/download.py new file mode 100644 index 0000000000..3e2630a034 --- /dev/null +++ b/pkg/workloads/cortex/model_download/download.py @@ -0,0 +1,68 @@ +# Copyright 2019 Cortex Labs, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import os +import argparse +import time + +from cortex.lib import Context, api_utils +from cortex.lib.storage import S3 +from cortex.lib.log import get_logger +from cortex.lib.exceptions import UserRuntimeException, UserException + +logger = get_logger() +logger.propagate = False # prevent double logging (flask modifies root logger) + +local_cache = {"ctx": None, "stub": None, "api": None, "metadata": None, "request_handler": None} + +def start(args): + api = None + try: + ctx = Context(s3_path=args.context, cache_dir=args.cache_dir, workload_id=args.workload_id) + api = ctx.apis_id_map[args.api] + local_cache["ctx"] = ctx + + if not os.path.isdir(args.model_dir): + bucket_name, prefix = ctx.storage.deconstruct_s3_path(api["model"]) + s3_client = S3(bucket_name, client_config={}) + s3_client.download_dir(prefix, args.model_dir) + + except Exception as e: + logger.exception( + "An error occurred, see `cortex logs -v api {}` for more details.".format(api["name"]) + ) + sys.exit(1) + + +def main(): + parser = argparse.ArgumentParser() + na = parser.add_argument_group("required named arguments") + na.add_argument("--workload-id", required=True, help="Workload ID") + na.add_argument( + "--context", + required=True, + help="S3 path to context (e.g. s3://bucket/path/to/context.json)", + ) + na.add_argument("--api", required=True, help="Resource id of api to serve") + na.add_argument("--model-dir", required=True, help="Directory to download the model to") + na.add_argument("--cache-dir", required=True, help="Local path for the context cache") + parser.set_defaults(func=start) + + args = parser.parse_args() + args.func(args) + + +if __name__ == "__main__": + main() diff --git a/pkg/workloads/cortex/tf_api/api.py b/pkg/workloads/cortex/tf_api/api.py index 66ae266c4e..2243e04290 100644 --- a/pkg/workloads/cortex/tf_api/api.py +++ b/pkg/workloads/cortex/tf_api/api.py @@ -314,22 +314,6 @@ def start(args): local_cache["api"] = api local_cache["ctx"] = ctx - if args.only_download: - bucket_name, prefix = ctx.storage.deconstruct_s3_path(api["model"]) - s3_client = S3(bucket_name, client_config={}) - s3_client.download_dir(prefix, args.model_dir) - folder = os.listdir(args.model_dir)[0] - if not folder.isdigit(): - cur_dir = os.path.join(args.model_dir, folder) - rename_dir = os.path.join(args.model_dir, "1") - util.logger.info( - "{} is not a servable version directory, will rename to {}".format( - cur_dir, rename_dir - ) - ) - os.rename(cur_dir, rename_dir) - return - if api.get("request_handler") is not None: package.install_packages(ctx.python_packages, ctx.storage) local_cache["request_handler"] = ctx.get_request_handler_impl(api["name"]) @@ -391,12 +375,6 @@ def main(): na.add_argument("--api", required=True, help="Resource id of api to serve") na.add_argument("--model-dir", required=True, help="Directory to download the model to") na.add_argument("--cache-dir", required=True, help="Local path for the context cache") - na.add_argument( - "--only-download", - required=False, - help="Only download model (for init-containers)", - default=False, - ) parser.set_defaults(func=start) args = parser.parse_args() From 9e4c829ab2807d612dc2199168ef0b87a3db89f3 Mon Sep 17 00:00:00 2001 From: Ivan Zhang Date: Mon, 26 Aug 2019 09:12:51 -0400 Subject: [PATCH 04/14] format --- pkg/workloads/cortex/model_download/download.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/workloads/cortex/model_download/download.py b/pkg/workloads/cortex/model_download/download.py index 3e2630a034..9c7f17b7d4 100644 --- a/pkg/workloads/cortex/model_download/download.py +++ b/pkg/workloads/cortex/model_download/download.py @@ -27,6 +27,7 @@ local_cache = {"ctx": None, "stub": None, "api": None, "metadata": None, "request_handler": None} + def start(args): api = None try: From 9057ae6689e9a63103606c733562015a0e1f026a Mon Sep 17 00:00:00 2001 From: Ivan Zhang Date: Mon, 26 Aug 2019 15:26:11 -0400 Subject: [PATCH 05/14] update packaging doc --- docs/deployments/packaging-models.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/deployments/packaging-models.md b/docs/deployments/packaging-models.md index 61f590231c..0bca2834ce 100644 --- a/docs/deployments/packaging-models.md +++ b/docs/deployments/packaging-models.md @@ -2,7 +2,7 @@ ## TensorFlow -Export your trained model and zip the model directory. An example is shown below (here is the [complete example](https://github.com/cortexlabs/cortex/blob/master/examples/iris/models/tensorflow_model.py)): +Export your trained model and upload the checkpoint directory. An example is shown below (here is the [complete example](https://github.com/cortexlabs/cortex/blob/master/examples/iris/models/tensorflow_model.py)): ```Python import tensorflow as tf @@ -22,10 +22,10 @@ eval_spec = tf.estimator.EvalSpec(eval_input_fn, exporters=[exporter], name="est tf.estimator.train_and_evaluate(classifier, train_spec, eval_spec) ``` -Upload the exported version directory to Amazon S3 using the AWS web console or CLI: +Upload the checkpoint directory to Amazon S3 using the AWS web console or CLI: ```text -$ aws s3 sync ./iris/export/estimator/156293432 s3://my-bucket/iris/156293432 +$ aws s3 sync ./iris s3://my-bucket/iris ``` Reference your model in an `api`: @@ -33,7 +33,7 @@ Reference your model in an `api`: ```yaml - kind: api name: my-api - model: s3://my-bucket/iris/156293432 + model: s3://my-bucket/iris ``` ## ONNX From 0e3e4a34850781ae6ed75374e61391ae43849738 Mon Sep 17 00:00:00 2001 From: Ivan Zhang Date: Mon, 26 Aug 2019 15:38:10 -0400 Subject: [PATCH 06/14] updating packaging docs --- docs/deployments/packaging-models.md | 31 ++++++++++++++++------------ 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/docs/deployments/packaging-models.md b/docs/deployments/packaging-models.md index 0bca2834ce..d2fcf0d56e 100644 --- a/docs/deployments/packaging-models.md +++ b/docs/deployments/packaging-models.md @@ -2,7 +2,7 @@ ## TensorFlow -Export your trained model and upload the checkpoint directory. An example is shown below (here is the [complete example](https://github.com/cortexlabs/cortex/blob/master/examples/iris/models/tensorflow_model.py)): +Export your trained model and upload the checkpoint directory. An example is shown below (here is the [complete example](https://github.com/cortexlabs/cortex/blob/master/examples/sentiment)): ```Python import tensorflow as tf @@ -10,22 +10,27 @@ import shutil import os ... - -classifier = tf.estimator.Estimator( - model_fn=my_model, model_dir="iris", params={"hidden_units": [10, 10], "n_classes": 3} -) - -exporter = tf.estimator.FinalExporter("estimator", serving_input_fn, as_text=False) -train_spec = tf.estimator.TrainSpec(train_input_fn, max_steps=1000) -eval_spec = tf.estimator.EvalSpec(eval_input_fn, exporters=[exporter], name="estimator-eval") - -tf.estimator.train_and_evaluate(classifier, train_spec, eval_spec) +OUPUT_DIR="bert" +estimator = tf.estimator.Estimator(model_fn=model_fn...) + +# TF Serving requires a special input_fn used at serving time +def serving_input_fn(): + inputs = tf.placeholder(shape=[128], dtype=tf.int32) + features = { + "input_ids": tf.expand_dims(inputs, 0), + "input_mask": tf.expand_dims(inputs, 0), + "segment_ids": tf.expand_dims(inputs, 0), + "label_ids": tf.placeholder(shape=[0], dtype=tf.int32), + } + return tf.estimator.export.ServingInputReceiver(features=features, receiver_tensors=inputs) + +estimator.export_savedmodel(OUPUT_DIR, serving_input_fn, strip_default_attrs=True) ``` Upload the checkpoint directory to Amazon S3 using the AWS web console or CLI: ```text -$ aws s3 sync ./iris s3://my-bucket/iris +$ aws s3 sync ./bert s3://my-bucket/bert ``` Reference your model in an `api`: @@ -33,7 +38,7 @@ Reference your model in an `api`: ```yaml - kind: api name: my-api - model: s3://my-bucket/iris + model: s3://my-bucket/bert ``` ## ONNX From 0973c6dfa6e19a7b2479059a4fbed84e08f77762 Mon Sep 17 00:00:00 2001 From: Ivan Zhang Date: Tue, 27 Aug 2019 12:55:33 -0400 Subject: [PATCH 07/14] address comments --- images/model-download/Dockerfile | 2 +- pkg/operator/api/userconfig/apis.go | 16 ++++++++-------- pkg/operator/api/userconfig/errors.go | 4 ++-- pkg/workloads/cortex/model_download/download.py | 10 +++------- 4 files changed, 14 insertions(+), 18 deletions(-) diff --git a/images/model-download/Dockerfile b/images/model-download/Dockerfile index 5e3e5d5d00..6cdfcf9ce2 100644 --- a/images/model-download/Dockerfile +++ b/images/model-download/Dockerfile @@ -1,4 +1,4 @@ -FROM ubuntu:16.04 +FROM ubuntu:18.04 ENV PYTHONPATH="/src:${PYTHONPATH}" diff --git a/pkg/operator/api/userconfig/apis.go b/pkg/operator/api/userconfig/apis.go index 284d6efb06..116899fd3e 100644 --- a/pkg/operator/api/userconfig/apis.go +++ b/pkg/operator/api/userconfig/apis.go @@ -132,7 +132,7 @@ func IsValidTensorFlowS3Directory(path string, awsClient *aws.Client) bool { func GetTFServingExportFromS3Path(path string, awsClient *aws.Client) (string, error) { if IsValidTensorFlowS3Directory(path, awsClient) { - return "", nil + return path, nil } bucket, prefix, err := aws.SplitS3Path(path) @@ -140,21 +140,23 @@ func GetTFServingExportFromS3Path(path string, awsClient *aws.Client) (string, e return "", err } - possiblePaths := make([]string, 0) resp, _ := awsClient.S3.ListObjects(&s3.ListObjectsInput{ Bucket: &bucket, Prefix: &prefix, }) for _, key := range resp.Contents { + if !strings.HasSuffix(*key.Key, "saved_model.pb") { + continue + } + keyParts := strings.Split(*key.Key, "/") possiblePath := bucket + "/" + strings.Join(keyParts[:len(keyParts)-1], "/") - if keyParts[len(keyParts)-1] == "saved_model.pb" && - IsValidTensorFlowS3Directory(possiblePath, awsClient) { - possiblePaths = append(possiblePaths, possiblePath) + if IsValidTensorFlowS3Directory(possiblePath, awsClient) { + return possiblePath, nil } } - return possiblePaths[0], nil + return "", nil } func (api *API) UserConfigStr() string { @@ -217,8 +219,6 @@ func (api *API) Validate() error { if ok, err := awsClient.IsS3PathFile(api.Model); err != nil || !ok { return errors.Wrap(ErrorExternalNotFound(api.Model), Identify(api), ModelKey) } - case IsValidTensorFlowS3Directory(api.Model, awsClient): - api.ModelFormat = TensorFlowModelFormat default: path, err := GetTFServingExportFromS3Path(api.Model, awsClient) if err != nil { diff --git a/pkg/operator/api/userconfig/errors.go b/pkg/operator/api/userconfig/errors.go index 2bd278798b..303e1923a9 100644 --- a/pkg/operator/api/userconfig/errors.go +++ b/pkg/operator/api/userconfig/errors.go @@ -284,8 +284,8 @@ func ErrorExternalNotFound(path string) error { var onnxExpectedStructMessage = `For ONNX models, the path should end in .onnx` -var tfExpectedStructMessage = `For TensorFlow models, the path contain a directory with the following structure: - 1523423423/ (Version prefix, usually a timestamp. If this is not present we'll make this 1 for you) +var tfExpectedStructMessage = `For TensorFlow models, the path must contain a directory with the following structure: + 1523423423/ (Version prefix, usually a timestamp) ├── saved_model.pb └── variables/ ├── variables.index diff --git a/pkg/workloads/cortex/model_download/download.py b/pkg/workloads/cortex/model_download/download.py index 9c7f17b7d4..9b03fdee82 100644 --- a/pkg/workloads/cortex/model_download/download.py +++ b/pkg/workloads/cortex/model_download/download.py @@ -25,20 +25,16 @@ logger = get_logger() logger.propagate = False # prevent double logging (flask modifies root logger) -local_cache = {"ctx": None, "stub": None, "api": None, "metadata": None, "request_handler": None} - def start(args): api = None try: ctx = Context(s3_path=args.context, cache_dir=args.cache_dir, workload_id=args.workload_id) api = ctx.apis_id_map[args.api] - local_cache["ctx"] = ctx - if not os.path.isdir(args.model_dir): - bucket_name, prefix = ctx.storage.deconstruct_s3_path(api["model"]) - s3_client = S3(bucket_name, client_config={}) - s3_client.download_dir(prefix, args.model_dir) + bucket_name, prefix = ctx.storage.deconstruct_s3_path(api["model"]) + s3_client = S3(bucket_name, client_config={}) + s3_client.download_dir(prefix, args.model_dir) except Exception as e: logger.exception( From 48c3bbf665037a460b898e2b494f7479e2cfd97a Mon Sep 17 00:00:00 2001 From: Ivan Zhang Date: Tue, 27 Aug 2019 12:57:16 -0400 Subject: [PATCH 08/14] update packaging docs --- docs/deployments/packaging-models.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/deployments/packaging-models.md b/docs/deployments/packaging-models.md index d2fcf0d56e..dc06c9ff1d 100644 --- a/docs/deployments/packaging-models.md +++ b/docs/deployments/packaging-models.md @@ -2,7 +2,7 @@ ## TensorFlow -Export your trained model and upload the checkpoint directory. An example is shown below (here is the [complete example](https://github.com/cortexlabs/cortex/blob/master/examples/sentiment)): +Export your trained model and upload the export directory, or checkpoint directory containing the export directory, which is usually the case if you used `estimator.train_and_evaluate`. An example is shown below (here is the [complete example](https://github.com/cortexlabs/cortex/blob/master/examples/sentiment)): ```Python import tensorflow as tf From a432e8251c234b7bdfd4c991ae566ad980117a52 Mon Sep 17 00:00:00 2001 From: Ivan Zhang Date: Tue, 27 Aug 2019 13:54:41 -0400 Subject: [PATCH 09/14] fix error --- pkg/lib/aws/s3.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/lib/aws/s3.go b/pkg/lib/aws/s3.go index f7388b3e17..c5a51239b3 100644 --- a/pkg/lib/aws/s3.go +++ b/pkg/lib/aws/s3.go @@ -246,7 +246,7 @@ func (c *Client) ListPrefix(prefix string, maxResults int64) ([]*s3.Object, erro MaxKeys: aws.Int64(maxResults), } - output, err := c.s3Client.ListObjectsV2(listObjectsInput) + output, err := c.S3.ListObjectsV2(listObjectsInput) if err != nil { return nil, errors.Wrap(err, prefix) } From 815414d62d11fe253b42535fc5d85a8a95dc9702 Mon Sep 17 00:00:00 2001 From: Ivan Zhang Date: Tue, 27 Aug 2019 14:29:18 -0400 Subject: [PATCH 10/14] update imagenet example, fix bug --- examples/image-classifier/cortex.yaml | 2 +- pkg/operator/api/userconfig/apis.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/examples/image-classifier/cortex.yaml b/examples/image-classifier/cortex.yaml index 90d8217b62..b37ee58131 100644 --- a/examples/image-classifier/cortex.yaml +++ b/examples/image-classifier/cortex.yaml @@ -3,5 +3,5 @@ - kind: api name: classifier - model: s3://cortex-examples/imagenet/1566492692 + model: s3://cortex-examples/imagenet/ request_handler: imagenet.py diff --git a/pkg/operator/api/userconfig/apis.go b/pkg/operator/api/userconfig/apis.go index f88bef23e5..7a64440f36 100644 --- a/pkg/operator/api/userconfig/apis.go +++ b/pkg/operator/api/userconfig/apis.go @@ -18,6 +18,7 @@ package userconfig import ( "fmt" + "path/filepath" "strings" "github.com/aws/aws-sdk-go/service/s3" @@ -168,7 +169,7 @@ func GetTFServingExportFromS3Path(path string, awsClient *aws.Client) (string, e } keyParts := strings.Split(*key.Key, "/") - possiblePath := bucket + "/" + strings.Join(keyParts[:len(keyParts)-1], "/") + possiblePath := "s3://" + filepath.Join(bucket, filepath.Join(keyParts[:len(keyParts)-1]...)) if IsValidTensorFlowS3Directory(possiblePath, awsClient) { return possiblePath, nil } From bcf5166b0f91b65e7c1049a3b2cd2311339f1b3a Mon Sep 17 00:00:00 2001 From: Ivan Zhang Date: Tue, 27 Aug 2019 15:47:41 -0400 Subject: [PATCH 11/14] make model-download more generic --- Makefile | 4 +-- cortex.sh | 4 +-- dev/registry.sh | 4 +-- docs/cluster/config.md | 2 +- docs/cluster/development.md | 2 +- .../{model-download => downloader}/Dockerfile | 4 +-- manager/install_cortex.sh | 2 +- pkg/operator/config/config.go | 4 +-- pkg/operator/workloads/api_workload.go | 30 +++++++------------ .../download.py | 28 ++++------------- pkg/workloads/cortex/lib/storage/s3.py | 6 ++++ pkg/workloads/cortex/onnx_serve/api.py | 9 +----- 12 files changed, 36 insertions(+), 63 deletions(-) rename images/{model-download => downloader}/Dockerfile (80%) rename pkg/workloads/cortex/{model_download => downloader}/download.py (53%) diff --git a/Makefile b/Makefile index d32578e58e..bb3b68449d 100644 --- a/Makefile +++ b/Makefile @@ -154,7 +154,7 @@ ci-build-images: @./build/build-image.sh images/istio-galley istio-galley @./build/build-image.sh images/istio-pilot istio-pilot @./build/build-image.sh images/istio-proxy istio-proxy - @./build/build-image.sh images/model-download model-download + @./build/build-image.sh images/downloader downloader ci-push-images: @./build/push-image.sh manager @@ -173,7 +173,7 @@ ci-push-images: @./build/push-image.sh istio-galley @./build/push-image.sh istio-pilot @./build/push-image.sh istio-proxy - @./build/push-image.sh model-download + @./build/push-image.sh downloader ci-build-cli: diff --git a/cortex.sh b/cortex.sh index e8a2fcb047..39c99e455e 100755 --- a/cortex.sh +++ b/cortex.sh @@ -145,7 +145,7 @@ export CORTEX_IMAGE_ISTIO_CITADEL="${CORTEX_IMAGE_ISTIO_CITADEL:-cortexlabs/isti export CORTEX_IMAGE_ISTIO_GALLEY="${CORTEX_IMAGE_ISTIO_GALLEY:-cortexlabs/istio-galley:$CORTEX_VERSION_STABLE}" export CORTEX_IMAGE_ISTIO_PILOT="${CORTEX_IMAGE_ISTIO_PILOT:-cortexlabs/istio-pilot:$CORTEX_VERSION_STABLE}" export CORTEX_IMAGE_ISTIO_PROXY="${CORTEX_IMAGE_ISTIO_PROXY:-cortexlabs/istio-proxy:$CORTEX_VERSION_STABLE}" -export CORTEX_IMAGE_MODEL_DOWNLOAD="${CORTEX_IMAGE_MODEL_DOWNLOAD:-cortexlabs/model-download:$CORTEX_VERSION_STABLE}" +export CORTEX_IMAGE_DOWNLOADER="${CORTEX_IMAGE_DOWNLOADER:-cortexlabs/downloader:$CORTEX_VERSION_STABLE}" export CORTEX_ENABLE_TELEMETRY="${CORTEX_ENABLE_TELEMETRY:-""}" export CORTEX_TELEMETRY_URL="${CORTEX_TELEMETRY_URL:-"https://telemetry.cortexlabs.dev"}" @@ -205,7 +205,7 @@ function install_cortex() { -e CORTEX_IMAGE_ISTIO_GALLEY=$CORTEX_IMAGE_ISTIO_GALLEY \ -e CORTEX_IMAGE_ISTIO_PILOT=$CORTEX_IMAGE_ISTIO_PILOT \ -e CORTEX_IMAGE_ISTIO_PROXY=$CORTEX_IMAGE_ISTIO_PROXY \ - -e CORTEX_IMAGE_MODEL_DOWNLOAD=$CORTEX_IMAGE_MODEL_DOWNLOAD \ + -e CORTEX_IMAGE_DOWNLOADER=$CORTEX_IMAGE_DOWNLOADER \ -e CORTEX_ENABLE_TELEMETRY=$CORTEX_ENABLE_TELEMETRY \ $CORTEX_IMAGE_MANAGER } diff --git a/dev/registry.sh b/dev/registry.sh index 9bc385b10f..c83509a58c 100755 --- a/dev/registry.sh +++ b/dev/registry.sh @@ -51,7 +51,7 @@ function create_registry() { aws ecr create-repository --repository-name=cortexlabs/cluster-autoscaler --region=$REGISTRY_REGION || true aws ecr create-repository --repository-name=cortexlabs/nvidia --region=$REGISTRY_REGION || true aws ecr create-repository --repository-name=cortexlabs/metrics-server --region=$REGISTRY_REGION || true - aws ecr create-repository --repository-name=cortexlabs/model-download --region=$REGISTRY_REGION || true + aws ecr create-repository --repository-name=cortexlabs/downloader --region=$REGISTRY_REGION || true } ### HELPERS ### @@ -140,7 +140,7 @@ elif [ "$cmd" = "update" ]; then fi build_and_push $ROOT/images/tf-api tf-api latest - build_and_push $ROOT/images/model-download model-download latest + build_and_push $ROOT/images/downloader downloader latest build_and_push $ROOT/images/onnx-serve onnx-serve latest cleanup diff --git a/docs/cluster/config.md b/docs/cluster/config.md index 50b4211d3a..7689b28f34 100644 --- a/docs/cluster/config.md +++ b/docs/cluster/config.md @@ -59,7 +59,7 @@ export CORTEX_IMAGE_ISTIO_PROXY="cortexlabs/istio-proxy:master" export CORTEX_IMAGE_ISTIO_PILOT="cortexlabs/istio-pilot:master" export CORTEX_IMAGE_ISTIO_CITADEL="cortexlabs/istio-citadel:master" export CORTEX_IMAGE_ISTIO_GALLEY="cortexlabs/istio-galley:master" -export CORTEX_IMAGE_MODEL_DOWNLOAD="cortexlabs/model-download:master" +export CORTEX_IMAGE_DOWNLOADER="cortexlabs/downloader:master" # Flag to enable collecting error reports and usage stats. If flag is not set to either "true" or "false", you will be prompted. export CORTEX_ENABLE_TELEMETRY="" diff --git a/docs/cluster/development.md b/docs/cluster/development.md index 0571978c2a..67fc1f0c12 100644 --- a/docs/cluster/development.md +++ b/docs/cluster/development.md @@ -77,7 +77,7 @@ export CORTEX_IMAGE_ISTIO_PROXY="XXXXXXXX.dkr.ecr.us-west-2.amazonaws.com/cortex export CORTEX_IMAGE_ISTIO_PILOT="XXXXXXXX.dkr.ecr.us-west-2.amazonaws.com/cortexlabs/istio-pilot:latest" export CORTEX_IMAGE_ISTIO_CITADEL="XXXXXXXX.dkr.ecr.us-west-2.amazonaws.com/cortexlabs/istio-citadel:latest" export CORTEX_IMAGE_ISTIO_GALLEY="XXXXXXXX.dkr.ecr.us-west-2.amazonaws.com/cortexlabs/istio-galley:latest" -export CORTEX_IMAGE_MODEL_DOWNLOAD="XXXXXXXX.dkr.ecr.us-west-2.amazonaws.com/cortexlabs/model-download:latest" +export CORTEX_IMAGE_DOWNLOADER="XXXXXXXX.dkr.ecr.us-west-2.amazonaws.com/cortexlabs/downloader:latest" export CORTEX_ENABLE_TELEMETRY="false" ``` diff --git a/images/model-download/Dockerfile b/images/downloader/Dockerfile similarity index 80% rename from images/model-download/Dockerfile rename to images/downloader/Dockerfile index 6cdfcf9ce2..0c87a1f6f4 100644 --- a/images/model-download/Dockerfile +++ b/images/downloader/Dockerfile @@ -18,6 +18,6 @@ RUN pip3 install -r /src/cortex/lib/requirements.txt && \ COPY pkg/workloads/cortex/consts.py /src/cortex/ COPY pkg/workloads/cortex/lib /src/cortex/lib -COPY pkg/workloads/cortex/model_download /src/cortex/model_download +COPY pkg/workloads/cortex/downloader /src/cortex/downloader -ENTRYPOINT ["/usr/bin/python3", "/src/cortex/model_download/download.py"] +ENTRYPOINT ["/usr/bin/python3", "/src/cortex/downloader/download.py"] diff --git a/manager/install_cortex.sh b/manager/install_cortex.sh index 7dee6d154d..6f001c3809 100755 --- a/manager/install_cortex.sh +++ b/manager/install_cortex.sh @@ -64,7 +64,7 @@ function setup_configmap() { --from-literal='IMAGE_ONNX_SERVE'=$CORTEX_IMAGE_ONNX_SERVE \ --from-literal='IMAGE_ONNX_SERVE_GPU'=$CORTEX_IMAGE_ONNX_SERVE_GPU \ --from-literal='IMAGE_TF_API'=$CORTEX_IMAGE_TF_API \ - --from-literal='IMAGE_MODEL_DOWNLOAD'=$CORTEX_IMAGE_MODEL_DOWNLOAD \ + --from-literal='IMAGE_DOWNLOADER'=$CORTEX_IMAGE_DOWNLOADER \ --from-literal='IMAGE_PYTHON_PACKAGER'=$CORTEX_IMAGE_PYTHON_PACKAGER \ --from-literal='IMAGE_TF_SERVE_GPU'=$CORTEX_IMAGE_TF_SERVE_GPU \ --from-literal='ENABLE_TELEMETRY'=$CORTEX_ENABLE_TELEMETRY \ diff --git a/pkg/operator/config/config.go b/pkg/operator/config/config.go index b7db9c08fe..96778b4d9f 100644 --- a/pkg/operator/config/config.go +++ b/pkg/operator/config/config.go @@ -46,7 +46,7 @@ type CortexConfig struct { OperatorImage string `json:"operator_image"` TFServeImage string `json:"tf_serve_image"` TFAPIImage string `json:"tf_api_image"` - ModelDownloadImage string `json:"model_download_image"` + DownloaderImage string `json:"downloader_image"` PythonPackagerImage string `json:"python_packager_image"` TFServeImageGPU string `json:"tf_serve_image_gpu"` ONNXServeImage string `json:"onnx_serve_image"` @@ -67,7 +67,7 @@ func Init() error { OperatorImage: getStr("IMAGE_OPERATOR"), TFServeImage: getStr("IMAGE_TF_SERVE"), TFAPIImage: getStr("IMAGE_TF_API"), - ModelDownloadImage: getStr("IMAGE_MODEL_DOWNLOAD"), + DownloaderImage: getStr("IMAGE_DOWNLOADER"), PythonPackagerImage: getStr("IMAGE_PYTHON_PACKAGER"), TFServeImageGPU: getStr("IMAGE_TF_SERVE_GPU"), ONNXServeImage: getStr("IMAGE_ONNX_SERVE"), diff --git a/pkg/operator/workloads/api_workload.go b/pkg/operator/workloads/api_workload.go index 010a2d7ff7..0e4bc36e23 100644 --- a/pkg/operator/workloads/api_workload.go +++ b/pkg/operator/workloads/api_workload.go @@ -34,9 +34,9 @@ import ( ) const ( - apiContainerName = "api" - tfServingContainerName = "serve" - modelDownloadInitContainerName = "model-download" + apiContainerName = "api" + tfServingContainerName = "serve" + downloaderInitContainerName = "downloader" defaultPortInt32, defaultPortStr = int32(8888), "8888" tfServingPortInt32, tfServingPortStr = int32(9000), "9000" @@ -282,15 +282,12 @@ func tfAPISpec( RestartPolicy: "Always", InitContainers: []kcore.Container{ { - Name: modelDownloadInitContainerName, - Image: config.Cortex.ModelDownloadImage, + Name: downloaderInitContainerName, + Image: config.Cortex.DownloaderImage, ImagePullPolicy: "Always", Args: []string{ - "--workload-id=" + workloadID, - "--context=" + config.AWS.S3Path(ctx.Key), - "--api=" + ctx.APIs[api.Name].ID, - "--model-dir=" + path.Join(consts.EmptyDirMountPath, "model"), - "--cache-dir=" + consts.ContextCacheDir, + "--download_from=" + ctx.APIs[api.Name].Model, + "--download_to=" + path.Join(consts.EmptyDirMountPath, "model"), }, Env: k8s.AWSCredentials(), VolumeMounts: k8s.DefaultVolumeMounts(), @@ -430,17 +427,12 @@ func onnxAPISpec( K8sPodSpec: kcore.PodSpec{ InitContainers: []kcore.Container{ { - Name: modelDownloadInitContainerName, - Image: servingImage, + Name: downloaderInitContainerName, + Image: config.Cortex.DownloaderImage, ImagePullPolicy: "Always", Args: []string{ - "--workload-id=" + workloadID, - "--port=" + defaultPortStr, - "--context=" + config.AWS.S3Path(ctx.Key), - "--api=" + ctx.APIs[api.Name].ID, - "--model-dir=" + path.Join(consts.EmptyDirMountPath, "model"), - "--cache-dir=" + consts.ContextCacheDir, - "--only-download=true", + "--download_from=" + ctx.APIs[api.Name].Model, + "--download_to=" + path.Join(consts.EmptyDirMountPath, "model"), }, Env: k8s.AWSCredentials(), VolumeMounts: k8s.DefaultVolumeMounts(), diff --git a/pkg/workloads/cortex/model_download/download.py b/pkg/workloads/cortex/downloader/download.py similarity index 53% rename from pkg/workloads/cortex/model_download/download.py rename to pkg/workloads/cortex/downloader/download.py index 9b03fdee82..9012933ce7 100644 --- a/pkg/workloads/cortex/model_download/download.py +++ b/pkg/workloads/cortex/downloader/download.py @@ -27,34 +27,16 @@ def start(args): - api = None - try: - ctx = Context(s3_path=args.context, cache_dir=args.cache_dir, workload_id=args.workload_id) - api = ctx.apis_id_map[args.api] - - bucket_name, prefix = ctx.storage.deconstruct_s3_path(api["model"]) - s3_client = S3(bucket_name, client_config={}) - s3_client.download_dir(prefix, args.model_dir) - - except Exception as e: - logger.exception( - "An error occurred, see `cortex logs -v api {}` for more details.".format(api["name"]) - ) - sys.exit(1) + bucket_name, prefix = S3.deconstruct_s3_path(args.download_from) + s3_client = S3(bucket_name, client_config={}) + s3_client.download(prefix, args.download_to) def main(): parser = argparse.ArgumentParser() na = parser.add_argument_group("required named arguments") - na.add_argument("--workload-id", required=True, help="Workload ID") - na.add_argument( - "--context", - required=True, - help="S3 path to context (e.g. s3://bucket/path/to/context.json)", - ) - na.add_argument("--api", required=True, help="Resource id of api to serve") - na.add_argument("--model-dir", required=True, help="Directory to download the model to") - na.add_argument("--cache-dir", required=True, help="Local path for the context cache") + na.add_argument("--download_from", required=True, help="Storage Path to download the file from") + na.add_argument("--download_to", required=True, help="Directory to download the file to") parser.set_defaults(func=start) args = parser.parse_args() diff --git a/pkg/workloads/cortex/lib/storage/s3.py b/pkg/workloads/cortex/lib/storage/s3.py index 95e930c999..83d81d7d4d 100644 --- a/pkg/workloads/cortex/lib/storage/s3.py +++ b/pkg/workloads/cortex/lib/storage/s3.py @@ -225,3 +225,9 @@ def download_and_unzip(self, key, local_dir): local_zip = os.path.join(local_dir, "zip.zip") self.download_file(key, local_zip) util.extract_zip(local_zip, delete_zip_file=True) + + def download(self, prefix, local_dir): + if self._is_s3_dir(prefix): + self.download_dir(prefix, local_dir) + else: + self.download_file_to_dir(prefix, local_dir) diff --git a/pkg/workloads/cortex/onnx_serve/api.py b/pkg/workloads/cortex/onnx_serve/api.py index e0aefe45e3..fb9b9f3fb3 100644 --- a/pkg/workloads/cortex/onnx_serve/api.py +++ b/pkg/workloads/cortex/onnx_serve/api.py @@ -234,15 +234,8 @@ def start(args): local_cache["api"] = api local_cache["ctx"] = ctx - bucket_name, prefix = ctx.storage.deconstruct_s3_path(api["model"]) + _, prefix = ctx.storage.deconstruct_s3_path(api["model"]) model_path = os.path.join(args.model_dir, os.path.basename(prefix)) - if not os.path.exists(model_path): - s3_client = S3(bucket_name, client_config={}) - s3_client.download_file(prefix, model_path) - - if args.only_download: - return - if api.get("request_handler") is not None: package.install_packages(ctx.python_packages, ctx.storage) local_cache["request_handler"] = ctx.get_request_handler_impl(api["name"]) From d13bfc4424ca65ab2a577dcb7431eacbfea9ee41 Mon Sep 17 00:00:00 2001 From: Ivan Zhang Date: Wed, 28 Aug 2019 12:31:52 -0400 Subject: [PATCH 12/14] address comments --- pkg/operator/api/userconfig/apis.go | 52 ++++++++++++--------- pkg/workloads/cortex/downloader/download.py | 6 --- pkg/workloads/cortex/onnx_serve/api.py | 6 --- 3 files changed, 30 insertions(+), 34 deletions(-) diff --git a/pkg/operator/api/userconfig/apis.go b/pkg/operator/api/userconfig/apis.go index 7a64440f36..8fe6040d12 100644 --- a/pkg/operator/api/userconfig/apis.go +++ b/pkg/operator/api/userconfig/apis.go @@ -19,6 +19,7 @@ package userconfig import ( "fmt" "path/filepath" + "strconv" "strings" "github.com/aws/aws-sdk-go/service/s3" @@ -163,19 +164,29 @@ func GetTFServingExportFromS3Path(path string, awsClient *aws.Client) (string, e Bucket: &bucket, Prefix: &prefix, }) + + highestVersion := int64(0) + var highestPath string for _, key := range resp.Contents { if !strings.HasSuffix(*key.Key, "saved_model.pb") { continue } keyParts := strings.Split(*key.Key, "/") + versionStr := keyParts[len(keyParts)] + version, err := strconv.ParseInt(versionStr, 10, 64) + if err != nil { + version = 0 + } + possiblePath := "s3://" + filepath.Join(bucket, filepath.Join(keyParts[:len(keyParts)-1]...)) - if IsValidTensorFlowS3Directory(possiblePath, awsClient) { - return possiblePath, nil + if IsValidTensorFlowS3Directory(possiblePath, awsClient) && version >= highestVersion { + highestVersion = version + highestPath = possiblePath } } - return "", nil + return highestPath, nil } func (api *API) UserConfigStr() string { @@ -221,34 +232,31 @@ func (api *API) Validate() error { return err } - switch api.ModelFormat { - case ONNXModelFormat: + switch { + case api.ModelFormat == ONNXModelFormat: if ok, err := awsClient.IsS3PathFile(api.Model); err != nil || !ok { return errors.Wrap(ErrorExternalNotFound(api.Model), Identify(api), ModelKey) } - case TensorFlowModelFormat: + case api.ModelFormat == TensorFlowModelFormat: path, err := GetTFServingExportFromS3Path(api.Model, awsClient) if path == "" || err != nil { return errors.Wrap(ErrorInvalidTensorflowDir(api.Model), Identify(api), ModelKey) } + case strings.HasSuffix(api.Model, ".onnx"): + api.ModelFormat = ONNXModelFormat + if ok, err := awsClient.IsS3PathFile(api.Model); err != nil || !ok { + return errors.Wrap(ErrorExternalNotFound(api.Model), Identify(api), ModelKey) + } default: - switch { - case strings.HasSuffix(api.Model, ".onnx"): - api.ModelFormat = ONNXModelFormat - if ok, err := awsClient.IsS3PathFile(api.Model); err != nil || !ok { - return errors.Wrap(ErrorExternalNotFound(api.Model), Identify(api), ModelKey) - } - default: - path, err := GetTFServingExportFromS3Path(api.Model, awsClient) - if err != nil { - return errors.Wrap(err, Identify(api), ModelKey) - } - if path == "" { - return errors.Wrap(ErrorUnableToInferModelFormat(api.Model), Identify(api)) - } - api.ModelFormat = TensorFlowModelFormat - api.Model = path + path, err := GetTFServingExportFromS3Path(api.Model, awsClient) + if err != nil { + return errors.Wrap(err, Identify(api), ModelKey) + } + if path == "" { + return errors.Wrap(ErrorUnableToInferModelFormat(api.Model), Identify(api)) } + api.ModelFormat = TensorFlowModelFormat + api.Model = path } if api.ModelFormat == TensorFlowModelFormat && api.TFServing == nil { diff --git a/pkg/workloads/cortex/downloader/download.py b/pkg/workloads/cortex/downloader/download.py index 9012933ce7..5bb843cb21 100644 --- a/pkg/workloads/cortex/downloader/download.py +++ b/pkg/workloads/cortex/downloader/download.py @@ -12,18 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -import sys -import os import argparse -import time -from cortex.lib import Context, api_utils from cortex.lib.storage import S3 from cortex.lib.log import get_logger -from cortex.lib.exceptions import UserRuntimeException, UserException logger = get_logger() -logger.propagate = False # prevent double logging (flask modifies root logger) def start(args): diff --git a/pkg/workloads/cortex/onnx_serve/api.py b/pkg/workloads/cortex/onnx_serve/api.py index fb9b9f3fb3..2c7f1ba601 100644 --- a/pkg/workloads/cortex/onnx_serve/api.py +++ b/pkg/workloads/cortex/onnx_serve/api.py @@ -277,12 +277,6 @@ def main(): na.add_argument("--api", required=True, help="Resource id of api to serve") na.add_argument("--model-dir", required=True, help="Directory to download the model to") na.add_argument("--cache-dir", required=True, help="Local path for the context cache") - na.add_argument( - "--only-download", - required=False, - help="Only download model (for init-containers)", - default=False, - ) parser.set_defaults(func=start) From c3c5b05c75343710b8f00486a5b5b7aa7c1ac002 Mon Sep 17 00:00:00 2001 From: Ivan Zhang Date: Wed, 28 Aug 2019 12:38:28 -0400 Subject: [PATCH 13/14] fix bug --- pkg/operator/api/userconfig/apis.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/operator/api/userconfig/apis.go b/pkg/operator/api/userconfig/apis.go index 8fe6040d12..2d219b4b65 100644 --- a/pkg/operator/api/userconfig/apis.go +++ b/pkg/operator/api/userconfig/apis.go @@ -173,7 +173,7 @@ func GetTFServingExportFromS3Path(path string, awsClient *aws.Client) (string, e } keyParts := strings.Split(*key.Key, "/") - versionStr := keyParts[len(keyParts)] + versionStr := keyParts[len(keyParts)-1] version, err := strconv.ParseInt(versionStr, 10, 64) if err != nil { version = 0 From 32f56d223bca73af75fb008c40aa72838546401b Mon Sep 17 00:00:00 2001 From: David Eliahu Date: Wed, 28 Aug 2019 11:59:49 -0700 Subject: [PATCH 14/14] Short circuit if check --- pkg/operator/api/userconfig/apis.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/operator/api/userconfig/apis.go b/pkg/operator/api/userconfig/apis.go index 2d219b4b65..3421c9f742 100644 --- a/pkg/operator/api/userconfig/apis.go +++ b/pkg/operator/api/userconfig/apis.go @@ -180,7 +180,7 @@ func GetTFServingExportFromS3Path(path string, awsClient *aws.Client) (string, e } possiblePath := "s3://" + filepath.Join(bucket, filepath.Join(keyParts[:len(keyParts)-1]...)) - if IsValidTensorFlowS3Directory(possiblePath, awsClient) && version >= highestVersion { + if version >= highestVersion && IsValidTensorFlowS3Directory(possiblePath, awsClient) { highestVersion = version highestPath = possiblePath }