From ee20c725cd08ff2a65bedecb7d9b2cea9aef7dc7 Mon Sep 17 00:00:00 2001 From: Ivan Zhang Date: Tue, 13 Aug 2019 14:26:15 -0400 Subject: [PATCH 01/26] progress --- examples/spell/cortex.yaml | 12 ++++++++++ examples/spell/request.py | 35 +++++++++++++++++++++++++++++ pkg/lib/models/models.go | 11 +++++++++ pkg/operator/api/userconfig/apis.go | 3 ++- 4 files changed, 60 insertions(+), 1 deletion(-) create mode 100644 examples/spell/cortex.yaml create mode 100644 examples/spell/request.py create mode 100644 pkg/lib/models/models.go diff --git a/examples/spell/cortex.yaml b/examples/spell/cortex.yaml new file mode 100644 index 0000000000..c25b940391 --- /dev/null +++ b/examples/spell/cortex.yaml @@ -0,0 +1,12 @@ +- kind: deployment + name: spell + +- kind: api + name: spell + model: s3://cortex-yolo/spell197.zip + compute: + min_replicas: 1 + max_replicas: 1 + cpu: "3" + gpu: 1 + mem: "48Gi" diff --git a/examples/spell/request.py b/examples/spell/request.py new file mode 100644 index 0000000000..b744937286 --- /dev/null +++ b/examples/spell/request.py @@ -0,0 +1,35 @@ +import numpy as np +import requests +from PIL import Image + +TF_SERVING_API = ( + "https://acea1a9abbdca11e9b29c0ea6849e7fb-276107737.us-west-2.elb.amazonaws.com/spell/spell" +) + + +def load_image(path): + img = Image.open(path) + img.load() + data = np.asarray(img, dtype="uint8") + data = np.delete(data, 0, 2) + data = np.expand_dims(data, axis=0) + return data + + +def load_zeros(): + return np.zeros((1, 1024, 768), dtype="uint8") + + +# image = load_image("./mower.jpg") +# image = load_image("./mower2.jpg") +image = load_zeros() + +print(image.shape, image.dtype) + +predict_body = {"samples": [{"images": [image.tolist()]}]} + +print("starting request") +result = requests.post(TF_SERVING_API, json=predict_body, verify=False) +print("finished request") + +print(result.text) diff --git a/pkg/lib/models/models.go b/pkg/lib/models/models.go new file mode 100644 index 0000000000..7e8c16ffe8 --- /dev/null +++ b/pkg/lib/models/models.go @@ -0,0 +1,11 @@ +package models + +import "strings" + +func IsValidS3Directory(path string) bool { + if strings.HasSuffix(path, ".zip") { + return true + } + + return false +} diff --git a/pkg/operator/api/userconfig/apis.go b/pkg/operator/api/userconfig/apis.go index a2fc5564f5..5100fd4c15 100644 --- a/pkg/operator/api/userconfig/apis.go +++ b/pkg/operator/api/userconfig/apis.go @@ -25,6 +25,7 @@ import ( "github.com/cortexlabs/cortex/pkg/lib/aws" cr "github.com/cortexlabs/cortex/pkg/lib/configreader" "github.com/cortexlabs/cortex/pkg/lib/errors" + "github.com/cortexlabs/cortex/pkg/lib/models" s "github.com/cortexlabs/cortex/pkg/lib/strings" "github.com/cortexlabs/cortex/pkg/operator/api/resource" ) @@ -126,7 +127,7 @@ func (api *API) Validate() error { if api.ModelFormat == UnknownModelFormat { if strings.HasSuffix(api.Model, ".onnx") { api.ModelFormat = ONNXModelFormat - } else if strings.HasSuffix(api.Model, ".zip") { + } else if models.IsValidS3Directory(api.Model) { api.ModelFormat = TensorFlowModelFormat } else { return errors.Wrap(ErrorUnableToInferModelFormat(), Identify(api)) From 1090f1ca3fd341e2750e937c75f7c0445a0c78ef Mon Sep 17 00:00:00 2001 From: Ivan Zhang Date: Wed, 14 Aug 2019 12:19:37 -0400 Subject: [PATCH 02/26] support directories in s3 --- docs/apis/packaging-models.md | 9 ++---- examples/iris/cortex.yaml | 2 +- examples/iris/models/tensorflow_model.py | 4 --- examples/sentiment/cortex.yaml | 2 +- pkg/lib/aws/s3.go | 21 ++++++++++++ pkg/lib/models/models.go | 41 +++++++++++++++++++++--- pkg/operator/api/userconfig/apis.go | 27 +++++++++------- pkg/workloads/cortex/lib/storage/s3.py | 16 +++++++++ pkg/workloads/cortex/tf_api/api.py | 10 +++--- 9 files changed, 98 insertions(+), 34 deletions(-) diff --git a/docs/apis/packaging-models.md b/docs/apis/packaging-models.md index f210f9913c..5629c2a68c 100644 --- a/docs/apis/packaging-models.md +++ b/docs/apis/packaging-models.md @@ -20,15 +20,12 @@ train_spec = tf.estimator.TrainSpec(train_input_fn, max_steps=1000) eval_spec = tf.estimator.EvalSpec(eval_input_fn, exporters=[exporter], name="estimator-eval") tf.estimator.train_and_evaluate(classifier, train_spec, eval_spec) - -# zip the estimator export dir (the exported path looks like iris/export/estimator/1562353043/) -shutil.make_archive("tensorflow", "zip", os.path.join("iris/export/estimator")) ``` -Upload the zipped file to Amazon S3 using the AWS web console or CLI: +Upload the exported timestamp directory to Amazon S3 using the AWS web console or CLI: ```text -$ aws s3 cp model.zip s3://my-bucket/model.zip +$ aws s3 sync ./iris/export/estimator/156293432 s3://my-bucket/156293432 ``` Reference your model in an `api`: @@ -36,7 +33,7 @@ Reference your model in an `api`: ```yaml - kind: api name: my-api - model: s3://my-bucket/model.zip + model: s3://my-bucket/156293432 ``` ## ONNX diff --git a/examples/iris/cortex.yaml b/examples/iris/cortex.yaml index db0f730f58..6886b50bf6 100644 --- a/examples/iris/cortex.yaml +++ b/examples/iris/cortex.yaml @@ -3,7 +3,7 @@ - kind: api name: tensorflow - model: s3://cortex-examples/iris/tensorflow.zip + model: s3://cortex-examples/iris/1 - kind: api name: pytorch diff --git a/examples/iris/models/tensorflow_model.py b/examples/iris/models/tensorflow_model.py index 5e197be9bf..87e6d654dc 100644 --- a/examples/iris/models/tensorflow_model.py +++ b/examples/iris/models/tensorflow_model.py @@ -88,7 +88,3 @@ def my_model(features, labels, mode, params): # zip the estimator export dir (the exported path looks like iris_tf_export/export/estimator/1562353043/) estimator_dir = EXPORT_DIR + "/export/estimator" -shutil.make_archive("tensorflow", "zip", os.path.join(estimator_dir)) - -# clean up -shutil.rmtree(EXPORT_DIR) diff --git a/examples/sentiment/cortex.yaml b/examples/sentiment/cortex.yaml index 1c638c657b..431d0aee88 100644 --- a/examples/sentiment/cortex.yaml +++ b/examples/sentiment/cortex.yaml @@ -3,5 +3,5 @@ - kind: api name: classifier - model: s3://cortex-examples/sentiment/bert.zip + model: s3://cortex-examples/sentiment/1565392692 request_handler: sentiment.py diff --git a/pkg/lib/aws/s3.go b/pkg/lib/aws/s3.go index 93b750aa5e..0d0e9f126b 100644 --- a/pkg/lib/aws/s3.go +++ b/pkg/lib/aws/s3.go @@ -360,3 +360,24 @@ func IsS3PathFileExternal(s3Path string) (bool, error) { return IsS3FileExternal(bucket, key) } + +func ListObjectsExternal(s3Path string) (*s3.ListObjectsV2Output, error) { + bucket, key, err := SplitS3Path(s3Path) + if err != nil { + return nil, err + } + + region, err := GetBucketRegion(bucket) + if err != nil { + return nil, err + } + + sess := session.Must(session.NewSession(&aws.Config{ + Region: aws.String(region), + })) + + return s3.New(sess).ListObjectsV2(&s3.ListObjectsV2Input{ + Bucket: aws.String(bucket), + Prefix: aws.String(key), + }) +} diff --git a/pkg/lib/models/models.go b/pkg/lib/models/models.go index 7e8c16ffe8..3b4f3d5511 100644 --- a/pkg/lib/models/models.go +++ b/pkg/lib/models/models.go @@ -1,11 +1,44 @@ package models -import "strings" +import ( + "fmt" + "strconv" + "github.com/cortexlabs/cortex/pkg/lib/sets/strset" + + "github.com/cortexlabs/cortex/pkg/lib/aws" +) + +// IsValidS3Directory checks that the path contains a valid S3 directory for Tensorflow models +// Must contain the following structure: +// - 1523423423/ (timestamped prefix) +// - saved_model.pb +// - variables/ +// - variables.index +// - variables.data-00000-of-00001 func IsValidS3Directory(path string) bool { - if strings.HasSuffix(path, ".zip") { - return true + listOut, err := aws.ListObjectsExternal(path) + if err != nil { + return false + } + + if listOut.Prefix == nil { + return false + } + + prefix := *listOut.Prefix + if _, err := strconv.ParseInt(prefix, 10, 64); err != nil { + return false + } + + objects := strset.New() + for _, o := range listOut.Contents { + objects.Add(*o.Key) } - return false + return objects.Has( + fmt.Sprintf("%s/saved_model.pb", prefix), + fmt.Sprintf("%s/variables/variables.index", prefix), + fmt.Sprintf("%s/variables/variables.data-00000-of-00001", prefix), + ) } diff --git a/pkg/operator/api/userconfig/apis.go b/pkg/operator/api/userconfig/apis.go index 5100fd4c15..e57592412e 100644 --- a/pkg/operator/api/userconfig/apis.go +++ b/pkg/operator/api/userconfig/apis.go @@ -119,24 +119,27 @@ func (apis APIs) Validate() error { func (api *API) Validate() error { if yaml.StartsWithEscapedAtSymbol(api.Model) { api.ModelFormat = TensorFlowModelFormat - } else { - if !aws.IsValidS3Path(api.Model) { - return errors.Wrap(ErrorInvalidS3PathOrResourceReference(api.Model), Identify(api), ModelKey) + if err := api.Compute.Validate(); err != nil { + return errors.Wrap(err, Identify(api), ComputeKey) } - if api.ModelFormat == UnknownModelFormat { - if strings.HasSuffix(api.Model, ".onnx") { - api.ModelFormat = ONNXModelFormat - } else if models.IsValidS3Directory(api.Model) { - api.ModelFormat = TensorFlowModelFormat - } else { - return errors.Wrap(ErrorUnableToInferModelFormat(), Identify(api)) - } - } + return nil + } + if !aws.IsValidS3Path(api.Model) { + return errors.Wrap(ErrorInvalidS3PathOrResourceReference(api.Model), Identify(api), ModelKey) + } + + switch { + case strings.HasSuffix(api.Model, ".onnx"): + api.ModelFormat = ONNXModelFormat if ok, err := aws.IsS3PathFileExternal(api.Model); err != nil || !ok { return errors.Wrap(ErrorExternalNotFound(api.Model), Identify(api), ModelKey) } + case models.IsValidS3Directory(api.Model): + api.ModelFormat = TensorFlowModelFormat + default: + return errors.Wrap(ErrorUnableToInferModelFormat(), Identify(api)) } if err := api.Compute.Validate(); err != nil { diff --git a/pkg/workloads/cortex/lib/storage/s3.py b/pkg/workloads/cortex/lib/storage/s3.py index dcaa612008..b6f963a4dc 100644 --- a/pkg/workloads/cortex/lib/storage/s3.py +++ b/pkg/workloads/cortex/lib/storage/s3.py @@ -249,3 +249,19 @@ def get_json_external(self, s3_path, num_retries=0, retry_delay_sec=2): if obj is None: return None return json.loads(obj.decode("utf-8")) + + def download_dir_external(self, s3_path, local_path): + util.mkdir_p(local_path) + bucket_name, key = self.deconstruct_s3_path(s3_path) + objects = self.s3.list_objects( + Bucket=bucket_name, + Prefix=key, + )["Contents"] + for obj in objects: + if not os.path.exists(os.path.dirname(obj["Key"])): + util.mkdir_p(os.path.join(local_path, os.path.dirname(obj["Key"]))) + + if obj["Key"][-1] == "/": + continue + + self.s3.download_file(bucket_name, obj["Key"], os.path.join(local_path, obj["Key"])) diff --git a/pkg/workloads/cortex/tf_api/api.py b/pkg/workloads/cortex/tf_api/api.py index d6a22ac7ac..9745b03506 100644 --- a/pkg/workloads/cortex/tf_api/api.py +++ b/pkg/workloads/cortex/tf_api/api.py @@ -480,13 +480,11 @@ def start(args): if api.get("request_handler") is not None: local_cache["request_handler"] = ctx.get_request_handler_impl(api["name"]) - if not util.is_resource_ref(api["model"]): - if not os.path.isdir(args.model_dir): - ctx.storage.download_and_unzip_external(api["model"], args.model_dir) + if args.only_download: + ctx.storage.download_dir_external(api["model"], args.model_dir) + return - if args.only_download: - return - else: + if util.is_resource_ref(api["model"]): model_name = util.get_resource_ref(api["model"]) model = ctx.models[model_name] estimator = ctx.estimators[model["estimator"]] From a542fcfa693054d6a146ab65c13a5a51a4e2ac1d Mon Sep 17 00:00:00 2001 From: Ivan Zhang Date: Wed, 14 Aug 2019 12:55:07 -0400 Subject: [PATCH 03/26] fix downloading --- pkg/lib/models/models.go | 14 ++++++++++---- pkg/workloads/cortex/lib/storage/s3.py | 13 ++++++++----- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/pkg/lib/models/models.go b/pkg/lib/models/models.go index 3b4f3d5511..e06ce964f2 100644 --- a/pkg/lib/models/models.go +++ b/pkg/lib/models/models.go @@ -3,6 +3,7 @@ package models import ( "fmt" "strconv" + "strings" "github.com/cortexlabs/cortex/pkg/lib/sets/strset" @@ -15,7 +16,7 @@ import ( // - saved_model.pb // - variables/ // - variables.index -// - variables.data-00000-of-00001 +// - variables.data-00000-of-00001 (there are a variable number of these files) func IsValidS3Directory(path string) bool { listOut, err := aws.ListObjectsExternal(path) if err != nil { @@ -27,18 +28,23 @@ func IsValidS3Directory(path string) bool { } prefix := *listOut.Prefix - if _, err := strconv.ParseInt(prefix, 10, 64); err != nil { + prefixParts := strings.Split(prefix, "/") + timestamp := prefixParts[len(prefixParts)-1] + if _, err := strconv.ParseInt(timestamp, 10, 64); err != nil { return false } + var containsVariableDataFile bool objects := strset.New() for _, o := range listOut.Contents { + if strings.Contains(*o.Key, "variables/variables.data-00000-of") { + containsVariableDataFile = true + } objects.Add(*o.Key) } return objects.Has( fmt.Sprintf("%s/saved_model.pb", prefix), fmt.Sprintf("%s/variables/variables.index", prefix), - fmt.Sprintf("%s/variables/variables.data-00000-of-00001", prefix), - ) + ) && containsVariableDataFile } diff --git a/pkg/workloads/cortex/lib/storage/s3.py b/pkg/workloads/cortex/lib/storage/s3.py index b6f963a4dc..75cb6ecd4b 100644 --- a/pkg/workloads/cortex/lib/storage/s3.py +++ b/pkg/workloads/cortex/lib/storage/s3.py @@ -252,16 +252,19 @@ def get_json_external(self, s3_path, num_retries=0, retry_delay_sec=2): def download_dir_external(self, s3_path, local_path): util.mkdir_p(local_path) - bucket_name, key = self.deconstruct_s3_path(s3_path) + bucket_name, prefix = self.deconstruct_s3_path(s3_path) objects = self.s3.list_objects( Bucket=bucket_name, - Prefix=key, + Prefix=prefix, )["Contents"] + + timestampDir = prefix.split("/")[-1] for obj in objects: - if not os.path.exists(os.path.dirname(obj["Key"])): - util.mkdir_p(os.path.join(local_path, os.path.dirname(obj["Key"]))) + local_key = obj["Key"].lstrip(prefix[:len(timestampDir)]) + if not os.path.exists(os.path.dirname(local_key)): + util.mkdir_p(os.path.join(local_path, os.path.dirname(local_key))) if obj["Key"][-1] == "/": continue - self.s3.download_file(bucket_name, obj["Key"], os.path.join(local_path, obj["Key"])) + self.s3.download_file(bucket_name, obj["Key"], os.path.join(local_path, local_key)) From 80910f2fbaad94b1ecda83eeca6f509f9b3c497b Mon Sep 17 00:00:00 2001 From: Ivan Zhang Date: Wed, 14 Aug 2019 12:56:25 -0400 Subject: [PATCH 04/26] clean up --- examples/spell/cortex.yaml | 12 ------------ examples/spell/request.py | 35 ----------------------------------- 2 files changed, 47 deletions(-) delete mode 100644 examples/spell/cortex.yaml delete mode 100644 examples/spell/request.py diff --git a/examples/spell/cortex.yaml b/examples/spell/cortex.yaml deleted file mode 100644 index c25b940391..0000000000 --- a/examples/spell/cortex.yaml +++ /dev/null @@ -1,12 +0,0 @@ -- kind: deployment - name: spell - -- kind: api - name: spell - model: s3://cortex-yolo/spell197.zip - compute: - min_replicas: 1 - max_replicas: 1 - cpu: "3" - gpu: 1 - mem: "48Gi" diff --git a/examples/spell/request.py b/examples/spell/request.py deleted file mode 100644 index b744937286..0000000000 --- a/examples/spell/request.py +++ /dev/null @@ -1,35 +0,0 @@ -import numpy as np -import requests -from PIL import Image - -TF_SERVING_API = ( - "https://acea1a9abbdca11e9b29c0ea6849e7fb-276107737.us-west-2.elb.amazonaws.com/spell/spell" -) - - -def load_image(path): - img = Image.open(path) - img.load() - data = np.asarray(img, dtype="uint8") - data = np.delete(data, 0, 2) - data = np.expand_dims(data, axis=0) - return data - - -def load_zeros(): - return np.zeros((1, 1024, 768), dtype="uint8") - - -# image = load_image("./mower.jpg") -# image = load_image("./mower2.jpg") -image = load_zeros() - -print(image.shape, image.dtype) - -predict_body = {"samples": [{"images": [image.tolist()]}]} - -print("starting request") -result = requests.post(TF_SERVING_API, json=predict_body, verify=False) -print("finished request") - -print(result.text) From 210dac79b66443748f290fd7c30bb57c86aed2df Mon Sep 17 00:00:00 2001 From: Ivan Zhang Date: Wed, 14 Aug 2019 12:59:43 -0400 Subject: [PATCH 05/26] format and clean up names --- pkg/workloads/cortex/lib/storage/s3.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/pkg/workloads/cortex/lib/storage/s3.py b/pkg/workloads/cortex/lib/storage/s3.py index 75cb6ecd4b..4c6c5d14a3 100644 --- a/pkg/workloads/cortex/lib/storage/s3.py +++ b/pkg/workloads/cortex/lib/storage/s3.py @@ -253,14 +253,11 @@ def get_json_external(self, s3_path, num_retries=0, retry_delay_sec=2): def download_dir_external(self, s3_path, local_path): util.mkdir_p(local_path) bucket_name, prefix = self.deconstruct_s3_path(s3_path) - objects = self.s3.list_objects( - Bucket=bucket_name, - Prefix=prefix, - )["Contents"] + objects = self.s3.list_objects(Bucket=bucket_name, Prefix=prefix)["Contents"] - timestampDir = prefix.split("/")[-1] + timestamp = prefix.split("/")[-1] for obj in objects: - local_key = obj["Key"].lstrip(prefix[:len(timestampDir)]) + local_key = obj["Key"].lstrip(prefix[: len(timestamp)]) if not os.path.exists(os.path.dirname(local_key)): util.mkdir_p(os.path.join(local_path, os.path.dirname(local_key))) From df8238e3257ebe1df1ea1d95ea2459a27a8d5874 Mon Sep 17 00:00:00 2001 From: Ivan Zhang Date: Wed, 14 Aug 2019 13:53:36 -0400 Subject: [PATCH 06/26] fix dir cleaning --- pkg/workloads/cortex/lib/storage/s3.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/workloads/cortex/lib/storage/s3.py b/pkg/workloads/cortex/lib/storage/s3.py index 4c6c5d14a3..3c991d80a1 100644 --- a/pkg/workloads/cortex/lib/storage/s3.py +++ b/pkg/workloads/cortex/lib/storage/s3.py @@ -254,10 +254,9 @@ def download_dir_external(self, s3_path, local_path): util.mkdir_p(local_path) bucket_name, prefix = self.deconstruct_s3_path(s3_path) objects = self.s3.list_objects(Bucket=bucket_name, Prefix=prefix)["Contents"] - timestamp = prefix.split("/")[-1] for obj in objects: - local_key = obj["Key"].lstrip(prefix[: len(timestamp)]) + local_key = obj["Key"].lstrip(prefix[: -len(timestamp)]) if not os.path.exists(os.path.dirname(local_key)): util.mkdir_p(os.path.join(local_path, os.path.dirname(local_key))) From dc5753f6481eb43e1ded823e3fa6edfceea29ff3 Mon Sep 17 00:00:00 2001 From: Ivan Zhang Date: Wed, 14 Aug 2019 14:10:06 -0400 Subject: [PATCH 07/26] timestamp -> version --- docs/apis/packaging-models.md | 2 +- pkg/lib/models/models.go | 6 +++--- pkg/workloads/cortex/lib/storage/s3.py | 17 +++-------------- pkg/workloads/cortex/tf_api/api.py | 20 +++++++++++++++++++- 4 files changed, 26 insertions(+), 19 deletions(-) diff --git a/docs/apis/packaging-models.md b/docs/apis/packaging-models.md index 5629c2a68c..48285b413f 100644 --- a/docs/apis/packaging-models.md +++ b/docs/apis/packaging-models.md @@ -22,7 +22,7 @@ eval_spec = tf.estimator.EvalSpec(eval_input_fn, exporters=[exporter], name="est tf.estimator.train_and_evaluate(classifier, train_spec, eval_spec) ``` -Upload the exported timestamp directory to Amazon S3 using the AWS web console or CLI: +Upload the exported version directory to Amazon S3 using the AWS web console or CLI: ```text $ aws s3 sync ./iris/export/estimator/156293432 s3://my-bucket/156293432 diff --git a/pkg/lib/models/models.go b/pkg/lib/models/models.go index e06ce964f2..3965a8daba 100644 --- a/pkg/lib/models/models.go +++ b/pkg/lib/models/models.go @@ -12,7 +12,7 @@ import ( // IsValidS3Directory checks that the path contains a valid S3 directory for Tensorflow models // Must contain the following structure: -// - 1523423423/ (timestamped prefix) +// - 1523423423/ (version prefix, usually a timestamp) // - saved_model.pb // - variables/ // - variables.index @@ -29,8 +29,8 @@ func IsValidS3Directory(path string) bool { prefix := *listOut.Prefix prefixParts := strings.Split(prefix, "/") - timestamp := prefixParts[len(prefixParts)-1] - if _, err := strconv.ParseInt(timestamp, 10, 64); err != nil { + version := prefixParts[len(prefixParts)-1] + if _, err := strconv.ParseInt(version, 10, 64); err != nil { return false } diff --git a/pkg/workloads/cortex/lib/storage/s3.py b/pkg/workloads/cortex/lib/storage/s3.py index 3c991d80a1..d473e60103 100644 --- a/pkg/workloads/cortex/lib/storage/s3.py +++ b/pkg/workloads/cortex/lib/storage/s3.py @@ -250,17 +250,6 @@ def get_json_external(self, s3_path, num_retries=0, retry_delay_sec=2): return None return json.loads(obj.decode("utf-8")) - def download_dir_external(self, s3_path, local_path): - util.mkdir_p(local_path) - bucket_name, prefix = self.deconstruct_s3_path(s3_path) - objects = self.s3.list_objects(Bucket=bucket_name, Prefix=prefix)["Contents"] - timestamp = prefix.split("/")[-1] - for obj in objects: - local_key = obj["Key"].lstrip(prefix[: -len(timestamp)]) - if not os.path.exists(os.path.dirname(local_key)): - util.mkdir_p(os.path.join(local_path, os.path.dirname(local_key))) - - if obj["Key"][-1] == "/": - continue - - self.s3.download_file(bucket_name, obj["Key"], os.path.join(local_path, local_key)) + def list_objects(self, s3_path): + bucket, prefix = self.deconstruct_s3_path(s3_path) + return self.s3.list_objects(Bucket=bucket, Prefix=prefix) diff --git a/pkg/workloads/cortex/tf_api/api.py b/pkg/workloads/cortex/tf_api/api.py index 9745b03506..644add050d 100644 --- a/pkg/workloads/cortex/tf_api/api.py +++ b/pkg/workloads/cortex/tf_api/api.py @@ -442,6 +442,24 @@ def get_signature(app_name, api_name): return jsonify(response) +def download_dir_external(ctx, s3_path, local_path): + util.mkdir_p(local_path) + bucket_name, prefix = ctx.storage.deconstruct_s3_path(s3_path) + objects = ctx.storage.list_objects(s3_path)["Contents"] + version = prefix.split("/")[-1] + for obj in objects: + local_key = obj["Key"].lstrip(prefix[: -len(version)]) + if not os.path.exists(os.path.dirname(local_key)): + util.mkdir_p(os.path.join(local_path, os.path.dirname(local_key))) + + if obj["Key"][-1] == "/": + continue + + ctx.storage.download_file_external( + bucket_name + "/" + obj["Key"], os.path.join(local_path, local_key) + ) + + def validate_model_dir(model_dir): """ validates that model_dir has the expected directory tree. @@ -481,7 +499,7 @@ def start(args): local_cache["request_handler"] = ctx.get_request_handler_impl(api["name"]) if args.only_download: - ctx.storage.download_dir_external(api["model"], args.model_dir) + download_dir_external(ctx, api["model"], args.model_dir) return if util.is_resource_ref(api["model"]): From 6a8505cbf47bcb326e38a56906eba21ccb837719 Mon Sep 17 00:00:00 2001 From: Ivan Zhang Date: Wed, 14 Aug 2019 14:42:37 -0400 Subject: [PATCH 08/26] add license --- pkg/lib/models/models.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/pkg/lib/models/models.go b/pkg/lib/models/models.go index 3965a8daba..73ac165785 100644 --- a/pkg/lib/models/models.go +++ b/pkg/lib/models/models.go @@ -1,3 +1,19 @@ +/* +Copyright 2019 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package models import ( From 828863bca6e9b19cd1801ee3c9954c6e9c0f7d2d Mon Sep 17 00:00:00 2001 From: Ivan Zhang Date: Wed, 14 Aug 2019 23:28:54 -0400 Subject: [PATCH 09/26] simplify path parsing --- pkg/lib/models/models.go | 3 +-- pkg/workloads/cortex/tf_api/api.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/lib/models/models.go b/pkg/lib/models/models.go index 73ac165785..54076609cb 100644 --- a/pkg/lib/models/models.go +++ b/pkg/lib/models/models.go @@ -21,9 +21,8 @@ import ( "strconv" "strings" - "github.com/cortexlabs/cortex/pkg/lib/sets/strset" - "github.com/cortexlabs/cortex/pkg/lib/aws" + "github.com/cortexlabs/cortex/pkg/lib/sets/strset" ) // IsValidS3Directory checks that the path contains a valid S3 directory for Tensorflow models diff --git a/pkg/workloads/cortex/tf_api/api.py b/pkg/workloads/cortex/tf_api/api.py index 644add050d..8180556ed5 100644 --- a/pkg/workloads/cortex/tf_api/api.py +++ b/pkg/workloads/cortex/tf_api/api.py @@ -448,7 +448,7 @@ def download_dir_external(ctx, s3_path, local_path): objects = ctx.storage.list_objects(s3_path)["Contents"] version = prefix.split("/")[-1] for obj in objects: - local_key = obj["Key"].lstrip(prefix[: -len(version)]) + local_key = obj["Key"][len(prefix)-len(version):] if not os.path.exists(os.path.dirname(local_key)): util.mkdir_p(os.path.join(local_path, os.path.dirname(local_key))) From e4c28a881c94307183c94ff4a0ffd73640c829c2 Mon Sep 17 00:00:00 2001 From: Ivan Zhang Date: Wed, 14 Aug 2019 23:46:24 -0400 Subject: [PATCH 10/26] address some comments --- docs/apis/packaging-models.md | 4 ++-- examples/iris/cortex.yaml | 2 +- pkg/workloads/cortex/lib/storage/local.py | 3 +++ pkg/workloads/cortex/lib/storage/s3.py | 6 +++--- pkg/workloads/cortex/tf_api/api.py | 8 ++++---- 5 files changed, 13 insertions(+), 10 deletions(-) diff --git a/docs/apis/packaging-models.md b/docs/apis/packaging-models.md index 48285b413f..61f590231c 100644 --- a/docs/apis/packaging-models.md +++ b/docs/apis/packaging-models.md @@ -25,7 +25,7 @@ tf.estimator.train_and_evaluate(classifier, train_spec, eval_spec) Upload the exported version directory to Amazon S3 using the AWS web console or CLI: ```text -$ aws s3 sync ./iris/export/estimator/156293432 s3://my-bucket/156293432 +$ aws s3 sync ./iris/export/estimator/156293432 s3://my-bucket/iris/156293432 ``` Reference your model in an `api`: @@ -33,7 +33,7 @@ Reference your model in an `api`: ```yaml - kind: api name: my-api - model: s3://my-bucket/156293432 + model: s3://my-bucket/iris/156293432 ``` ## ONNX diff --git a/examples/iris/cortex.yaml b/examples/iris/cortex.yaml index 6886b50bf6..1c183c5a9f 100644 --- a/examples/iris/cortex.yaml +++ b/examples/iris/cortex.yaml @@ -3,7 +3,7 @@ - kind: api name: tensorflow - model: s3://cortex-examples/iris/1 + model: s3://cortex-examples/iris/tensorflow/1560263532 - kind: api name: pytorch diff --git a/pkg/workloads/cortex/lib/storage/local.py b/pkg/workloads/cortex/lib/storage/local.py index 51f91d1b94..6e7ce5a12e 100644 --- a/pkg/workloads/cortex/lib/storage/local.py +++ b/pkg/workloads/cortex/lib/storage/local.py @@ -141,3 +141,6 @@ def download_and_unzip(self, key, local_dir): local_zip = os.path.join(local_dir, "zip.zip") self.download_file(key, local_zip) util.extract_zip(local_zip, delete_zip_file=True) + + def list_objects(self, path): + return os.listdir(path) diff --git a/pkg/workloads/cortex/lib/storage/s3.py b/pkg/workloads/cortex/lib/storage/s3.py index d473e60103..61b01f5af7 100644 --- a/pkg/workloads/cortex/lib/storage/s3.py +++ b/pkg/workloads/cortex/lib/storage/s3.py @@ -250,6 +250,6 @@ def get_json_external(self, s3_path, num_retries=0, retry_delay_sec=2): return None return json.loads(obj.decode("utf-8")) - def list_objects(self, s3_path): - bucket, prefix = self.deconstruct_s3_path(s3_path) - return self.s3.list_objects(Bucket=bucket, Prefix=prefix) + def list_objects(self, path): + bucket, prefix = self.deconstruct_s3_path(path) + return [obj["Key"] for obj in self.s3.list_objects(Bucket=bucket, Prefix=prefix)["Contents"]] diff --git a/pkg/workloads/cortex/tf_api/api.py b/pkg/workloads/cortex/tf_api/api.py index 8180556ed5..a53971925d 100644 --- a/pkg/workloads/cortex/tf_api/api.py +++ b/pkg/workloads/cortex/tf_api/api.py @@ -445,18 +445,18 @@ def get_signature(app_name, api_name): def download_dir_external(ctx, s3_path, local_path): util.mkdir_p(local_path) bucket_name, prefix = ctx.storage.deconstruct_s3_path(s3_path) - objects = ctx.storage.list_objects(s3_path)["Contents"] + objects = ctx.storage.list_objects(s3_path) version = prefix.split("/")[-1] for obj in objects: - local_key = obj["Key"][len(prefix)-len(version):] + local_key = obj[len(prefix)-len(version):] if not os.path.exists(os.path.dirname(local_key)): util.mkdir_p(os.path.join(local_path, os.path.dirname(local_key))) - if obj["Key"][-1] == "/": + if obj[-1] == "/": continue ctx.storage.download_file_external( - bucket_name + "/" + obj["Key"], os.path.join(local_path, local_key) + bucket_name + "/" + obj, os.path.join(local_path, local_key) ) From 6dc1552402ba6a56c93f1a160a045cba1e9167c1 Mon Sep 17 00:00:00 2001 From: Ivan Zhang Date: Thu, 15 Aug 2019 10:16:36 -0400 Subject: [PATCH 11/26] fix error message --- pkg/operator/api/userconfig/errors.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/pkg/operator/api/userconfig/errors.go b/pkg/operator/api/userconfig/errors.go index 62ff9cad25..e01ca9013a 100644 --- a/pkg/operator/api/userconfig/errors.go +++ b/pkg/operator/api/userconfig/errors.go @@ -600,9 +600,18 @@ func ErrorExternalNotFound(path string) error { } func ErrorUnableToInferModelFormat() error { + message := "unable to infer " + ModelFormatKey + ": path to model should end in .onnx for ONNX models, or the " + ModelFormatKey + " key must be specified\n" + message += "For TF models, the path should be a directory with the following structure: \n\n" + message += "1523423423/ (version prefix, usually a timestamp)\n" + message += "\tsaved_model.pb\n" + message += "\tvariables/\n" + message += "\t\tvariables.index\n" + message += "\t\tvariables.data-00000-of-00003\n" + message += "\t\tvariables.data-00001-of-00003\n" + message += "\t\tvariables.data-00002-of-...\n\n" return Error{ Kind: ErrUnableToInferModelFormat, - message: "unable to infer " + ModelFormatKey + ": path to model should end in .zip for TensorFlow models, .onnx for ONNX models, or the " + ModelFormatKey + " key must be specified", + message: message, } } From f3ba3ddbf9d5e535dd34e66abf607b5f75320d12 Mon Sep 17 00:00:00 2001 From: Ivan Zhang Date: Thu, 15 Aug 2019 10:24:33 -0400 Subject: [PATCH 12/26] remove newline, format --- pkg/operator/api/userconfig/errors.go | 2 +- pkg/workloads/cortex/lib/storage/s3.py | 4 +++- pkg/workloads/cortex/tf_api/api.py | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/operator/api/userconfig/errors.go b/pkg/operator/api/userconfig/errors.go index e01ca9013a..e6962ffbca 100644 --- a/pkg/operator/api/userconfig/errors.go +++ b/pkg/operator/api/userconfig/errors.go @@ -608,7 +608,7 @@ func ErrorUnableToInferModelFormat() error { message += "\t\tvariables.index\n" message += "\t\tvariables.data-00000-of-00003\n" message += "\t\tvariables.data-00001-of-00003\n" - message += "\t\tvariables.data-00002-of-...\n\n" + message += "\t\tvariables.data-00002-of-...\n" return Error{ Kind: ErrUnableToInferModelFormat, message: message, diff --git a/pkg/workloads/cortex/lib/storage/s3.py b/pkg/workloads/cortex/lib/storage/s3.py index 61b01f5af7..0b0515c428 100644 --- a/pkg/workloads/cortex/lib/storage/s3.py +++ b/pkg/workloads/cortex/lib/storage/s3.py @@ -252,4 +252,6 @@ def get_json_external(self, s3_path, num_retries=0, retry_delay_sec=2): def list_objects(self, path): bucket, prefix = self.deconstruct_s3_path(path) - return [obj["Key"] for obj in self.s3.list_objects(Bucket=bucket, Prefix=prefix)["Contents"]] + return [ + obj["Key"] for obj in self.s3.list_objects(Bucket=bucket, Prefix=prefix)["Contents"] + ] diff --git a/pkg/workloads/cortex/tf_api/api.py b/pkg/workloads/cortex/tf_api/api.py index a53971925d..094b682179 100644 --- a/pkg/workloads/cortex/tf_api/api.py +++ b/pkg/workloads/cortex/tf_api/api.py @@ -448,7 +448,7 @@ def download_dir_external(ctx, s3_path, local_path): objects = ctx.storage.list_objects(s3_path) version = prefix.split("/")[-1] for obj in objects: - local_key = obj[len(prefix)-len(version):] + local_key = obj[len(prefix) - len(version) :] if not os.path.exists(os.path.dirname(local_key)): util.mkdir_p(os.path.join(local_path, os.path.dirname(local_key))) From 48815853e83c34f527636515c2a393fc7d7514a9 Mon Sep 17 00:00:00 2001 From: Ivan Zhang Date: Thu, 15 Aug 2019 17:36:35 -0400 Subject: [PATCH 13/26] clean up, address comments --- pkg/lib/aws/s3.go | 59 +++++++++++++------------- pkg/lib/models/models.go | 35 ++++----------- pkg/operator/api/userconfig/apis.go | 24 ++++++++--- pkg/operator/api/userconfig/config.go | 2 + pkg/operator/api/userconfig/errors.go | 31 ++++++++++---- pkg/workloads/cortex/lib/storage/s3.py | 4 +- pkg/workloads/cortex/tf_api/api.py | 14 ++---- 7 files changed, 86 insertions(+), 83 deletions(-) diff --git a/pkg/lib/aws/s3.go b/pkg/lib/aws/s3.go index 0d0e9f126b..fd15da4db9 100644 --- a/pkg/lib/aws/s3.go +++ b/pkg/lib/aws/s3.go @@ -318,7 +318,7 @@ func IsS3PrefixExternal(bucket string, prefix string) (bool, error) { return hasPrefix, nil } -func IsS3FileExternal(bucket string, key string) (bool, error) { +func IsS3FileExternal(bucket string, keys ...string) (bool, error) { region, err := GetBucketRegion(bucket) if err != nil { return false, err @@ -328,17 +328,19 @@ func IsS3FileExternal(bucket string, key string) (bool, error) { Region: aws.String(region), })) - _, err = s3.New(sess).HeadObject(&s3.HeadObjectInput{ - Bucket: aws.String(bucket), - Key: aws.String(key), - }) + for _, key := range keys { + _, err = s3.New(sess).HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) - if IsNotFoundErr(err) { - return false, nil - } + if IsNotFoundErr(err) { + return false, nil + } - if err != nil { - return false, errors.Wrap(err, bucket, key) + if err != nil { + return false, errors.Wrap(err, bucket, key) + } } return true, nil @@ -352,32 +354,29 @@ func IsS3aPathPrefixExternal(s3aPath string) (bool, error) { return IsS3PrefixExternal(bucket, prefix) } -func IsS3PathFileExternal(s3Path string) (bool, error) { - bucket, key, err := SplitS3Path(s3Path) +func IsS3PathPrefixExternal(s3Path string) (bool, error) { + bucket, prefix, err := SplitS3Path(s3Path) if err != nil { return false, err } - - return IsS3FileExternal(bucket, key) + return IsS3PrefixExternal(bucket, prefix) } -func ListObjectsExternal(s3Path string) (*s3.ListObjectsV2Output, error) { - bucket, key, err := SplitS3Path(s3Path) - if err != nil { - return nil, err - } +func IsS3PathFileExternal(s3Paths ...string) (bool, error) { + for _, s3Path := range s3Paths { + bucket, key, err := SplitS3Path(s3Path) + if err != nil { + return false, err + } + exists, err := IsS3FileExternal(bucket, key) + if err != nil { + return false, err + } - region, err := GetBucketRegion(bucket) - if err != nil { - return nil, err + if !exists { + return false, nil + } } - sess := session.Must(session.NewSession(&aws.Config{ - Region: aws.String(region), - })) - - return s3.New(sess).ListObjectsV2(&s3.ListObjectsV2Input{ - Bucket: aws.String(bucket), - Prefix: aws.String(key), - }) + return true, nil } diff --git a/pkg/lib/models/models.go b/pkg/lib/models/models.go index 54076609cb..068160a356 100644 --- a/pkg/lib/models/models.go +++ b/pkg/lib/models/models.go @@ -18,11 +18,8 @@ package models import ( "fmt" - "strconv" - "strings" "github.com/cortexlabs/cortex/pkg/lib/aws" - "github.com/cortexlabs/cortex/pkg/lib/sets/strset" ) // IsValidS3Directory checks that the path contains a valid S3 directory for Tensorflow models @@ -33,33 +30,17 @@ import ( // - variables.index // - variables.data-00000-of-00001 (there are a variable number of these files) func IsValidS3Directory(path string) bool { - listOut, err := aws.ListObjectsExternal(path) - if err != nil { + if valid, err := aws.IsS3PathFileExternal( + fmt.Sprintf("%s/saved_model.pb", path), + fmt.Sprintf("%s/variables/variables.index", path), + ); err != nil || !valid { return false } - if listOut.Prefix == nil { + if valid, err := aws.IsS3PathPrefixExternal( + fmt.Sprintf("%s/variables/variables.data-00000-of", path), + ); err != nil || !valid { return false } - - prefix := *listOut.Prefix - prefixParts := strings.Split(prefix, "/") - version := prefixParts[len(prefixParts)-1] - if _, err := strconv.ParseInt(version, 10, 64); err != nil { - return false - } - - var containsVariableDataFile bool - objects := strset.New() - for _, o := range listOut.Contents { - if strings.Contains(*o.Key, "variables/variables.data-00000-of") { - containsVariableDataFile = true - } - objects.Add(*o.Key) - } - - return objects.Has( - fmt.Sprintf("%s/saved_model.pb", prefix), - fmt.Sprintf("%s/variables/variables.index", prefix), - ) && containsVariableDataFile + return true } diff --git a/pkg/operator/api/userconfig/apis.go b/pkg/operator/api/userconfig/apis.go index e57592412e..61c98c5035 100644 --- a/pkg/operator/api/userconfig/apis.go +++ b/pkg/operator/api/userconfig/apis.go @@ -130,16 +130,28 @@ func (api *API) Validate() error { return errors.Wrap(ErrorInvalidS3PathOrResourceReference(api.Model), Identify(api), ModelKey) } - switch { - case strings.HasSuffix(api.Model, ".onnx"): - api.ModelFormat = ONNXModelFormat + switch api.ModelFormat { + case ONNXModelFormat: if ok, err := aws.IsS3PathFileExternal(api.Model); err != nil || !ok { return errors.Wrap(ErrorExternalNotFound(api.Model), Identify(api), ModelKey) } - case models.IsValidS3Directory(api.Model): - api.ModelFormat = TensorFlowModelFormat + case TensorFlowModelFormat: + if !models.IsValidS3Directory(api.Model) { + return errors.Wrap(ErrorInvalidTensorflowDir(api.Model), Identify(api), ModelKey) + } default: - return errors.Wrap(ErrorUnableToInferModelFormat(), Identify(api)) + switch { + case strings.HasSuffix(api.Model, ".onnx"): + api.ModelFormat = ONNXModelFormat + if ok, err := aws.IsS3PathFileExternal(api.Model); err != nil || !ok { + return errors.Wrap(ErrorExternalNotFound(api.Model), Identify(api), ModelKey) + } + case models.IsValidS3Directory(api.Model): + api.ModelFormat = TensorFlowModelFormat + default: + return errors.Wrap(ErrorUnableToInferModelFormat(), Identify(api)) + } + } if err := api.Compute.Validate(); err != nil { diff --git a/pkg/operator/api/userconfig/config.go b/pkg/operator/api/userconfig/config.go index 55a3d4e408..4f217fbaac 100644 --- a/pkg/operator/api/userconfig/config.go +++ b/pkg/operator/api/userconfig/config.go @@ -19,6 +19,7 @@ package userconfig import ( "fmt" "io/ioutil" + "log" "github.com/cortexlabs/yaml" @@ -120,6 +121,7 @@ func (config *Config) ValidatePartial() error { } } if config.APIs != nil { + log.Println(config.APIs) if err := config.APIs.Validate(); err != nil { return err } diff --git a/pkg/operator/api/userconfig/errors.go b/pkg/operator/api/userconfig/errors.go index e6962ffbca..5e94a57699 100644 --- a/pkg/operator/api/userconfig/errors.go +++ b/pkg/operator/api/userconfig/errors.go @@ -80,6 +80,7 @@ const ( ErrInvalidS3PathOrResourceReference ErrUnableToInferModelFormat ErrExternalNotFound + ErrInvalidTensorflowDir ) var errorKinds = []string{ @@ -133,9 +134,10 @@ var errorKinds = []string{ "err_invalid_s3_path_or_resource_reference", "err_unable_to_infer_model_format", "err_external_not_found", + "err_invalid_tensorflow_dir", } -var _ = [1]int{}[int(ErrExternalNotFound)-(len(errorKinds)-1)] // Ensure list length matches +var _ = [1]int{}[int(ErrInvalidTensorflowDir)-(len(errorKinds)-1)] // Ensure list length matches func (t ErrorKind) String() string { return errorKinds[t] @@ -599,21 +601,32 @@ func ErrorExternalNotFound(path string) error { } } +var tfExpectedStructMessage string = ` + For TF models, the path should be a directory with the following structure: \n\n + 1523423423/ (version prefix, usually a timestamp)\n + \tsaved_model.pb\n + \tvariables/\n + \t\tvariables.index\n + \t\tvariables.data-00000-of-00003\n + \t\tvariables.data-00001-of-00003\n + \t\tvariables.data-00002-of-...\n` + func ErrorUnableToInferModelFormat() error { message := "unable to infer " + ModelFormatKey + ": path to model should end in .onnx for ONNX models, or the " + ModelFormatKey + " key must be specified\n" - message += "For TF models, the path should be a directory with the following structure: \n\n" - message += "1523423423/ (version prefix, usually a timestamp)\n" - message += "\tsaved_model.pb\n" - message += "\tvariables/\n" - message += "\t\tvariables.index\n" - message += "\t\tvariables.data-00000-of-00003\n" - message += "\t\tvariables.data-00001-of-00003\n" - message += "\t\tvariables.data-00002-of-...\n" + message += tfExpectedStructMessage return Error{ Kind: ErrUnableToInferModelFormat, message: message, } } +func ErrorInvalidTensorflowDir(path string) error { + message := "invalid TF export directory.\n" + message += tfExpectedStructMessage + return Error{ + Kind: ErrInvalidTensorflowDir, + message: message, + } +} func ErrorInvalidS3PathOrResourceReference(provided string) error { s3ErrMsg := aws.ErrorInvalidS3Path(provided).Error() diff --git a/pkg/workloads/cortex/lib/storage/s3.py b/pkg/workloads/cortex/lib/storage/s3.py index 0b0515c428..279b09d194 100644 --- a/pkg/workloads/cortex/lib/storage/s3.py +++ b/pkg/workloads/cortex/lib/storage/s3.py @@ -253,5 +253,7 @@ def get_json_external(self, s3_path, num_retries=0, retry_delay_sec=2): def list_objects(self, path): bucket, prefix = self.deconstruct_s3_path(path) return [ - obj["Key"] for obj in self.s3.list_objects(Bucket=bucket, Prefix=prefix)["Contents"] + obj["Key"] + for obj in self.s3.list_objects(Bucket=bucket, Prefix=prefix)["Contents"] + if obj["Key"][:1] != "/" ] diff --git a/pkg/workloads/cortex/tf_api/api.py b/pkg/workloads/cortex/tf_api/api.py index 094b682179..003b519469 100644 --- a/pkg/workloads/cortex/tf_api/api.py +++ b/pkg/workloads/cortex/tf_api/api.py @@ -452,9 +452,6 @@ def download_dir_external(ctx, s3_path, local_path): if not os.path.exists(os.path.dirname(local_key)): util.mkdir_p(os.path.join(local_path, os.path.dirname(local_key))) - if obj[-1] == "/": - continue - ctx.storage.download_file_external( bucket_name + "/" + obj, os.path.join(local_path, local_key) ) @@ -499,7 +496,10 @@ def start(args): local_cache["request_handler"] = ctx.get_request_handler_impl(api["name"]) if args.only_download: - download_dir_external(ctx, api["model"], args.model_dir) + if util.is_resource_ref(api["model"]): + ctx.storage.download_and_unzip(model["key"], args.model_dir) + else: + download_dir_external(ctx, api["model"], args.model_dir) return if util.is_resource_ref(api["model"]): @@ -507,12 +507,6 @@ def start(args): model = ctx.models[model_name] estimator = ctx.estimators[model["estimator"]] - if not os.path.isdir(args.model_dir): - ctx.storage.download_and_unzip(model["key"], args.model_dir) - - if args.only_download: - return - local_cache["model"] = model local_cache["estimator"] = estimator local_cache["target_col"] = ctx.columns[util.get_resource_ref(model["target_column"])] From 2c2bed85ee5c6fcf0fe340415aeeba809fda65c0 Mon Sep 17 00:00:00 2001 From: Ivan Zhang Date: Thu, 15 Aug 2019 17:37:19 -0400 Subject: [PATCH 14/26] remove log --- pkg/operator/api/userconfig/config.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/operator/api/userconfig/config.go b/pkg/operator/api/userconfig/config.go index 4f217fbaac..55a3d4e408 100644 --- a/pkg/operator/api/userconfig/config.go +++ b/pkg/operator/api/userconfig/config.go @@ -19,7 +19,6 @@ package userconfig import ( "fmt" "io/ioutil" - "log" "github.com/cortexlabs/yaml" @@ -121,7 +120,6 @@ func (config *Config) ValidatePartial() error { } } if config.APIs != nil { - log.Println(config.APIs) if err := config.APIs.Validate(); err != nil { return err } From f265ff0fa98c5b32d6f21df453cd352c2c52f2b9 Mon Sep 17 00:00:00 2001 From: Ivan Zhang Date: Thu, 15 Aug 2019 17:38:43 -0400 Subject: [PATCH 15/26] remove extra validation --- pkg/operator/api/userconfig/config.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/pkg/operator/api/userconfig/config.go b/pkg/operator/api/userconfig/config.go index 55a3d4e408..146fa6afa8 100644 --- a/pkg/operator/api/userconfig/config.go +++ b/pkg/operator/api/userconfig/config.go @@ -154,11 +154,6 @@ func (config *Config) ValidatePartial() error { } func (config *Config) Validate(envName string) error { - err := config.ValidatePartial() - if err != nil { - return err - } - if config.App == nil { return ErrorMissingAppDefinition() } From 45a4a2656689bb2eded41f1cd3c97d7816bbdc37 Mon Sep 17 00:00:00 2001 From: Ivan Zhang Date: Thu, 15 Aug 2019 17:47:34 -0400 Subject: [PATCH 16/26] remove type --- pkg/operator/api/userconfig/errors.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/operator/api/userconfig/errors.go b/pkg/operator/api/userconfig/errors.go index 5e94a57699..dc5475624b 100644 --- a/pkg/operator/api/userconfig/errors.go +++ b/pkg/operator/api/userconfig/errors.go @@ -601,7 +601,7 @@ func ErrorExternalNotFound(path string) error { } } -var tfExpectedStructMessage string = ` +var tfExpectedStructMessage = ` For TF models, the path should be a directory with the following structure: \n\n 1523423423/ (version prefix, usually a timestamp)\n \tsaved_model.pb\n From 1ab8a277c43699b25fa903b5a9397f294bd5f652 Mon Sep 17 00:00:00 2001 From: David Eliahu Date: Thu, 15 Aug 2019 15:11:17 -0700 Subject: [PATCH 17/26] Update error message --- pkg/operator/api/userconfig/errors.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/pkg/operator/api/userconfig/errors.go b/pkg/operator/api/userconfig/errors.go index dc5475624b..3e5f20821f 100644 --- a/pkg/operator/api/userconfig/errors.go +++ b/pkg/operator/api/userconfig/errors.go @@ -601,19 +601,19 @@ func ErrorExternalNotFound(path string) error { } } -var tfExpectedStructMessage = ` - For TF models, the path should be a directory with the following structure: \n\n - 1523423423/ (version prefix, usually a timestamp)\n - \tsaved_model.pb\n - \tvariables/\n - \t\tvariables.index\n - \t\tvariables.data-00000-of-00003\n - \t\tvariables.data-00001-of-00003\n - \t\tvariables.data-00002-of-...\n` +var onnxExpectedStructMessage = `For ONNX models, the path should end in .onnx` + +var tfExpectedStructMessage = `For TensorFlow models, the path should be a directory with the following structure: + 1523423423/ (version prefix, usually a timestamp) + ├── saved_model.pb + └── variables/ + ├── variables.index + ├── variables.data-00000-of-00003 + ├── variables.data-00001-of-00003 + └── variables.data-00002-of-...` func ErrorUnableToInferModelFormat() error { - message := "unable to infer " + ModelFormatKey + ": path to model should end in .onnx for ONNX models, or the " + ModelFormatKey + " key must be specified\n" - message += tfExpectedStructMessage + message := ModelFormatKey + " not specified, and could not be inferred\n" + onnxExpectedStructMessage + "\n" + tfExpectedStructMessage return Error{ Kind: ErrUnableToInferModelFormat, message: message, From 1cbb4e5c2a7c25b008443a1486eaf68f5c09028b Mon Sep 17 00:00:00 2001 From: David Eliahu Date: Thu, 15 Aug 2019 15:19:29 -0700 Subject: [PATCH 18/26] Update api.py --- pkg/workloads/cortex/tf_api/api.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/workloads/cortex/tf_api/api.py b/pkg/workloads/cortex/tf_api/api.py index 003b519469..fdb097f567 100644 --- a/pkg/workloads/cortex/tf_api/api.py +++ b/pkg/workloads/cortex/tf_api/api.py @@ -495,11 +495,13 @@ def start(args): if api.get("request_handler") is not None: local_cache["request_handler"] = ctx.get_request_handler_impl(api["name"]) - if args.only_download: + if not os.path.isdir(args.model_dir): if util.is_resource_ref(api["model"]): ctx.storage.download_and_unzip(model["key"], args.model_dir) else: download_dir_external(ctx, api["model"], args.model_dir) + + if args.only_download: return if util.is_resource_ref(api["model"]): From 829b62801678ff57ffa40d24ca439dcbabc758fa Mon Sep 17 00:00:00 2001 From: ivan Date: Fri, 16 Aug 2019 13:57:17 -0400 Subject: [PATCH 19/26] address comments --- pkg/lib/models/models.go | 46 ----------------------- pkg/operator/api/userconfig/apis.go | 28 ++++++++++++-- pkg/workloads/cortex/lib/storage/local.py | 7 ++++ pkg/workloads/cortex/lib/storage/s3.py | 6 +-- pkg/workloads/cortex/tf_api/api.py | 17 ++++++--- 5 files changed, 45 insertions(+), 59 deletions(-) delete mode 100644 pkg/lib/models/models.go diff --git a/pkg/lib/models/models.go b/pkg/lib/models/models.go deleted file mode 100644 index 068160a356..0000000000 --- a/pkg/lib/models/models.go +++ /dev/null @@ -1,46 +0,0 @@ -/* -Copyright 2019 Cortex Labs, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package models - -import ( - "fmt" - - "github.com/cortexlabs/cortex/pkg/lib/aws" -) - -// IsValidS3Directory checks that the path contains a valid S3 directory for Tensorflow models -// Must contain the following structure: -// - 1523423423/ (version prefix, usually a timestamp) -// - saved_model.pb -// - variables/ -// - variables.index -// - variables.data-00000-of-00001 (there are a variable number of these files) -func IsValidS3Directory(path string) bool { - if valid, err := aws.IsS3PathFileExternal( - fmt.Sprintf("%s/saved_model.pb", path), - fmt.Sprintf("%s/variables/variables.index", path), - ); err != nil || !valid { - return false - } - - if valid, err := aws.IsS3PathPrefixExternal( - fmt.Sprintf("%s/variables/variables.data-00000-of", path), - ); err != nil || !valid { - return false - } - return true -} diff --git a/pkg/operator/api/userconfig/apis.go b/pkg/operator/api/userconfig/apis.go index 61c98c5035..af0ca915c1 100644 --- a/pkg/operator/api/userconfig/apis.go +++ b/pkg/operator/api/userconfig/apis.go @@ -25,7 +25,6 @@ import ( "github.com/cortexlabs/cortex/pkg/lib/aws" cr "github.com/cortexlabs/cortex/pkg/lib/configreader" "github.com/cortexlabs/cortex/pkg/lib/errors" - "github.com/cortexlabs/cortex/pkg/lib/models" s "github.com/cortexlabs/cortex/pkg/lib/strings" "github.com/cortexlabs/cortex/pkg/operator/api/resource" ) @@ -78,6 +77,29 @@ var apiValidation = &cr.StructValidation{ }, } +// IsValidS3Directory checks that the path contains a valid S3 directory for Tensorflow models +// Must contain the following structure: +// - 1523423423/ (version prefix, usually a timestamp) +// - saved_model.pb +// - variables/ +// - variables.index +// - variables.data-00000-of-00001 (there are a variable number of these files) +func IsValidS3Directory(path string) bool { + if valid, err := aws.IsS3PathFileExternal( + fmt.Sprintf("%s/saved_model.pb", path), + fmt.Sprintf("%s/variables/variables.index", path), + ); err != nil || !valid { + return false + } + + if valid, err := aws.IsS3PathPrefixExternal( + fmt.Sprintf("%s/variables/variables.data-00000-of", path), + ); err != nil || !valid { + return false + } + return true +} + func (api *API) UserConfigStr() string { var sb strings.Builder sb.WriteString(api.ResourceFields.UserConfigStr()) @@ -136,7 +158,7 @@ func (api *API) Validate() error { return errors.Wrap(ErrorExternalNotFound(api.Model), Identify(api), ModelKey) } case TensorFlowModelFormat: - if !models.IsValidS3Directory(api.Model) { + if !IsValidS3Directory(api.Model) { return errors.Wrap(ErrorInvalidTensorflowDir(api.Model), Identify(api), ModelKey) } default: @@ -146,7 +168,7 @@ func (api *API) Validate() error { if ok, err := aws.IsS3PathFileExternal(api.Model); err != nil || !ok { return errors.Wrap(ErrorExternalNotFound(api.Model), Identify(api), ModelKey) } - case models.IsValidS3Directory(api.Model): + case IsValidS3Directory(api.Model): api.ModelFormat = TensorFlowModelFormat default: return errors.Wrap(ErrorUnableToInferModelFormat(), Identify(api)) diff --git a/pkg/workloads/cortex/lib/storage/local.py b/pkg/workloads/cortex/lib/storage/local.py index 6e7ce5a12e..39e253503f 100644 --- a/pkg/workloads/cortex/lib/storage/local.py +++ b/pkg/workloads/cortex/lib/storage/local.py @@ -143,4 +143,11 @@ def download_and_unzip(self, key, local_dir): util.extract_zip(local_zip, delete_zip_file=True) def list_objects(self, path): + objects = [] + for root, dirs, files in os.walk(path): + for name in files: + objects.append(os.path.join(root, name)) + for name in dirs: + objects.append(os.path.join(root, name)) + return os.listdir(path) diff --git a/pkg/workloads/cortex/lib/storage/s3.py b/pkg/workloads/cortex/lib/storage/s3.py index 279b09d194..761de6da9b 100644 --- a/pkg/workloads/cortex/lib/storage/s3.py +++ b/pkg/workloads/cortex/lib/storage/s3.py @@ -251,9 +251,7 @@ def get_json_external(self, s3_path, num_retries=0, retry_delay_sec=2): return json.loads(obj.decode("utf-8")) def list_objects(self, path): - bucket, prefix = self.deconstruct_s3_path(path) + _, prefix = self.deconstruct_s3_path(path) return [ - obj["Key"] - for obj in self.s3.list_objects(Bucket=bucket, Prefix=prefix)["Contents"] - if obj["Key"][:1] != "/" + obj[len(prefix) + 1 :] for obj in self.search(prefix=prefix) if not obj.endswith("/") ] diff --git a/pkg/workloads/cortex/tf_api/api.py b/pkg/workloads/cortex/tf_api/api.py index fdb097f567..ad2f7a8f57 100644 --- a/pkg/workloads/cortex/tf_api/api.py +++ b/pkg/workloads/cortex/tf_api/api.py @@ -31,6 +31,7 @@ from cortex import consts from cortex.lib import util, tf_lib, package, Context from cortex.lib.log import get_logger +from cortex.lib.storage import S3, LocalStorage from cortex.lib.exceptions import CortexException, UserRuntimeException, UserException from cortex.lib.context import create_transformer_inputs_from_map @@ -445,15 +446,16 @@ def get_signature(app_name, api_name): def download_dir_external(ctx, s3_path, local_path): util.mkdir_p(local_path) bucket_name, prefix = ctx.storage.deconstruct_s3_path(s3_path) - objects = ctx.storage.list_objects(s3_path) + storage_client = S3(bucket_name, client_config={}) + objects = storage_client.list_objects(s3_path) version = prefix.split("/")[-1] + local_path = os.path.join(local_path, version) for obj in objects: - local_key = obj[len(prefix) - len(version) :] - if not os.path.exists(os.path.dirname(local_key)): - util.mkdir_p(os.path.join(local_path, os.path.dirname(local_key))) + if not os.path.exists(os.path.dirname(obj)): + util.mkdir_p(os.path.join(local_path, os.path.dirname(obj))) ctx.storage.download_file_external( - bucket_name + "/" + obj, os.path.join(local_path, local_key) + bucket_name + "/" + os.path.join(prefix, obj), os.path.join(local_path, obj) ) @@ -497,10 +499,13 @@ def start(args): if not os.path.isdir(args.model_dir): if util.is_resource_ref(api["model"]): + model_name = util.get_resource_ref(api["model"]) + model = ctx.models[model_name] ctx.storage.download_and_unzip(model["key"], args.model_dir) else: download_dir_external(ctx, api["model"], args.model_dir) - + local = LocalStorage(base_dir=args.model_dir) + util.logger.info(local.list_objects(args.model_dir)) if args.only_download: return From 2094f43dc58016bf75c2f3e1e284a9f76cbbe1a9 Mon Sep 17 00:00:00 2001 From: ivan Date: Fri, 16 Aug 2019 16:12:21 -0400 Subject: [PATCH 20/26] fix local ls --- pkg/workloads/cortex/lib/storage/local.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/workloads/cortex/lib/storage/local.py b/pkg/workloads/cortex/lib/storage/local.py index 39e253503f..9336bb9e20 100644 --- a/pkg/workloads/cortex/lib/storage/local.py +++ b/pkg/workloads/cortex/lib/storage/local.py @@ -150,4 +150,4 @@ def list_objects(self, path): for name in dirs: objects.append(os.path.join(root, name)) - return os.listdir(path) + return objects From 968c497d5520276cb79f58da43dc817862d1eab6 Mon Sep 17 00:00:00 2001 From: ivan Date: Fri, 16 Aug 2019 16:12:58 -0400 Subject: [PATCH 21/26] remove testing --- pkg/workloads/cortex/tf_api/api.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/workloads/cortex/tf_api/api.py b/pkg/workloads/cortex/tf_api/api.py index 3edf07d379..1e945cf256 100644 --- a/pkg/workloads/cortex/tf_api/api.py +++ b/pkg/workloads/cortex/tf_api/api.py @@ -517,8 +517,6 @@ def start(args): ctx.storage.download_and_unzip(model["key"], args.model_dir) else: download_dir_external(ctx, api["model"], args.model_dir) - local = LocalStorage(base_dir=args.model_dir) - util.logger.info(local.list_objects(args.model_dir)) if args.only_download: return From 1237d61c18a1805740ab26f57f5c0134e3312bdf Mon Sep 17 00:00:00 2001 From: ivan Date: Fri, 16 Aug 2019 16:13:33 -0400 Subject: [PATCH 22/26] add spacing --- pkg/workloads/cortex/tf_api/api.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/workloads/cortex/tf_api/api.py b/pkg/workloads/cortex/tf_api/api.py index 1e945cf256..67e5503210 100644 --- a/pkg/workloads/cortex/tf_api/api.py +++ b/pkg/workloads/cortex/tf_api/api.py @@ -517,8 +517,9 @@ def start(args): ctx.storage.download_and_unzip(model["key"], args.model_dir) else: download_dir_external(ctx, api["model"], args.model_dir) - if args.only_download: - return + + if args.only_download: + return if util.is_resource_ref(api["model"]): model_name = util.get_resource_ref(api["model"]) From 09c4d443fa633ef3e537ba720f84b1842a0fbc59 Mon Sep 17 00:00:00 2001 From: ivan Date: Fri, 16 Aug 2019 16:16:51 -0400 Subject: [PATCH 23/26] format --- pkg/workloads/cortex/tf_api/api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/workloads/cortex/tf_api/api.py b/pkg/workloads/cortex/tf_api/api.py index 67e5503210..35e0bf59d7 100644 --- a/pkg/workloads/cortex/tf_api/api.py +++ b/pkg/workloads/cortex/tf_api/api.py @@ -517,7 +517,7 @@ def start(args): ctx.storage.download_and_unzip(model["key"], args.model_dir) else: download_dir_external(ctx, api["model"], args.model_dir) - + if args.only_download: return From e3083cef96a8c12988566944093d79cec1584d3c Mon Sep 17 00:00:00 2001 From: ivan Date: Mon, 19 Aug 2019 00:35:07 -0400 Subject: [PATCH 24/26] remove list_objects --- pkg/workloads/cortex/lib/storage/local.py | 10 - pkg/workloads/cortex/lib/storage/s3.py | 450 +++++++++++----------- pkg/workloads/cortex/tf_api/api.py | 12 +- 3 files changed, 228 insertions(+), 244 deletions(-) diff --git a/pkg/workloads/cortex/lib/storage/local.py b/pkg/workloads/cortex/lib/storage/local.py index 9336bb9e20..51f91d1b94 100644 --- a/pkg/workloads/cortex/lib/storage/local.py +++ b/pkg/workloads/cortex/lib/storage/local.py @@ -141,13 +141,3 @@ def download_and_unzip(self, key, local_dir): local_zip = os.path.join(local_dir, "zip.zip") self.download_file(key, local_zip) util.extract_zip(local_zip, delete_zip_file=True) - - def list_objects(self, path): - objects = [] - for root, dirs, files in os.walk(path): - for name in files: - objects.append(os.path.join(root, name)) - for name in dirs: - objects.append(os.path.join(root, name)) - - return objects diff --git a/pkg/workloads/cortex/lib/storage/s3.py b/pkg/workloads/cortex/lib/storage/s3.py index 761de6da9b..a738109a81 100644 --- a/pkg/workloads/cortex/lib/storage/s3.py +++ b/pkg/workloads/cortex/lib/storage/s3.py @@ -26,232 +26,226 @@ class S3(object): - def __init__(self, bucket=None, region=None, client_config={}): - self.bucket = bucket - self.region = region - - default_config = { - "use_ssl": True, - "verify": True, - "region_name": None, - "aws_access_key_id": None, - "aws_secret_access_key": None, - "aws_session_token": None, - } - - if client_config is None: - client_config = {} - - if region is not None: - client_config["region_name"] = region - - merged_client_config = util.merge_dicts_in_place_no_overwrite(client_config, default_config) - - self.s3 = boto3.client("s3", **client_config) - - @staticmethod - def deconstruct_s3_path(s3_path): - path = util.remove_prefix_if_present(s3_path, "s3://") - bucket = path.split("/")[0] - key = os.path.join(*path.split("/")[1:]) - return (bucket, key) - - def hadoop_path(self, key): - return os.path.join("s3a://", self.bucket, key) - - def blob_path(self, key): - return os.path.join("s3://", self.bucket, key) - - def _get_dir(self, prefix, local_dir): - prefix = util.add_suffix_unless_present(prefix, "/") - util.mkdir_p(local_dir) - for key in self._get_matching_s3_keys_generator(prefix): - rel_path = util.remove_prefix_if_present(key, prefix) - local_dest_path = os.path.join(local_dir, rel_path) - self.download_file(key, local_dest_path) - - def _file_exists(self, key): - try: - self.s3.head_object(Bucket=self.bucket, Key=key) - return True - except botocore.exceptions.ClientError as e: - if e.response["Error"]["Code"] == "404": - return False - else: - raise - - def _is_s3_prefix(self, prefix): - response = self.s3.list_objects_v2(Bucket=self.bucket, Prefix=prefix) - return response["KeyCount"] > 0 - - def _is_s3_dir(self, dir_path): - prefix = util.add_suffix_unless_present(dir_path, "/") - return self._is_s3_prefix(prefix) - - def _get_matching_s3_objects_generator(self, prefix="", suffix=""): - kwargs = {"Bucket": self.bucket, "Prefix": prefix} - - while True: - resp = self.s3.list_objects_v2(**kwargs) - try: - contents = resp["Contents"] - except KeyError: - return - - for obj in contents: - key = obj["Key"] - if key.startswith(prefix) and key.endswith(suffix): - yield obj - - try: - kwargs["ContinuationToken"] = resp["NextContinuationToken"] - except KeyError: - break - - def _get_matching_s3_keys_generator(self, prefix="", suffix=""): - for obj in self._get_matching_s3_objects_generator(prefix, suffix): - yield obj["Key"] - - def _upload_string_to_s3(self, string, key): - self.s3.put_object(Bucket=self.bucket, Key=key, Body=string) - - def _read_bytes_from_s3( - self, key, allow_missing=False, ext_bucket=None, num_retries=0, retry_delay_sec=2 - ): - while True: - try: - return self._read_bytes_from_s3_single( - key, allow_missing=allow_missing, ext_bucket=ext_bucket - ) - except: - if num_retries <= 0: - raise - num_retries -= 1 - time.sleep(retry_delay_sec) - - def _read_bytes_from_s3_single(self, key, allow_missing=False, ext_bucket=None): - bucket = self.bucket - if ext_bucket is not None: - bucket = ext_bucket - - try: - try: - byte_array = self.s3.get_object(Bucket=bucket, Key=key)["Body"].read() - except self.s3.exceptions.NoSuchKey: - if allow_missing: - return None - raise - except Exception as e: - raise CortexException( - 'key "{}" in bucket "{}" could not be accessed; '.format(key, bucket) - + "it may not exist, or you may not have suffienct permissions" - ) from e - - return byte_array.strip() - - def search(self, prefix="", suffix=""): - return list(self._get_matching_s3_keys_generator(prefix, suffix)) - - def put_json(self, obj, key): - self._upload_string_to_s3(json.dumps(obj), key) - - def get_json(self, key, allow_missing=False, num_retries=0, retry_delay_sec=2): - obj = self._read_bytes_from_s3( - key, - allow_missing=allow_missing, - num_retries=num_retries, - retry_delay_sec=retry_delay_sec, + def __init__(self, bucket=None, region=None, client_config={}): + self.bucket = bucket + self.region = region + + default_config = { + "use_ssl": True, + "verify": True, + "region_name": None, + "aws_access_key_id": None, + "aws_secret_access_key": None, + "aws_session_token": None, + } + + if client_config is None: + client_config = {} + + if region is not None: + client_config["region_name"] = region + + merged_client_config = util.merge_dicts_in_place_no_overwrite(client_config, default_config) + + self.s3 = boto3.client("s3", **client_config) + + @staticmethod + def deconstruct_s3_path(s3_path): + path = util.remove_prefix_if_present(s3_path, "s3://") + bucket = path.split("/")[0] + key = os.path.join(*path.split("/")[1:]) + return (bucket, key) + + def hadoop_path(self, key): + return os.path.join("s3a://", self.bucket, key) + + def blob_path(self, key): + return os.path.join("s3://", self.bucket, key) + + def _get_dir(self, prefix, local_dir): + prefix = util.add_suffix_unless_present(prefix, "/") + util.mkdir_p(local_dir) + for key in self._get_matching_s3_keys_generator(prefix): + rel_path = util.remove_prefix_if_present(key, prefix) + local_dest_path = os.path.join(local_dir, rel_path) + self.download_file(key, local_dest_path) + + def _file_exists(self, key): + try: + self.s3.head_object(Bucket=self.bucket, Key=key) + return True + except botocore.exceptions.ClientError as e: + if e.response["Error"]["Code"] == "404": + return False + else: + raise + + def _is_s3_prefix(self, prefix): + response = self.s3.list_objects_v2(Bucket=self.bucket, Prefix=prefix) + return response["KeyCount"] > 0 + + def _is_s3_dir(self, dir_path): + prefix = util.add_suffix_unless_present(dir_path, "/") + return self._is_s3_prefix(prefix) + + def _get_matching_s3_objects_generator(self, prefix="", suffix=""): + kwargs = {"Bucket": self.bucket, "Prefix": prefix} + + while True: + resp = self.s3.list_objects_v2(**kwargs) + try: + contents = resp["Contents"] + except KeyError: + return + + for obj in contents: + key = obj["Key"] + if key.startswith(prefix) and key.endswith(suffix): + yield obj + + try: + kwargs["ContinuationToken"] = resp["NextContinuationToken"] + except KeyError: + break + + def _get_matching_s3_keys_generator(self, prefix="", suffix=""): + for obj in self._get_matching_s3_objects_generator(prefix, suffix): + yield obj["Key"] + + def _upload_string_to_s3(self, string, key): + self.s3.put_object(Bucket=self.bucket, Key=key, Body=string) + + def _read_bytes_from_s3( + self, key, allow_missing=False, ext_bucket=None, num_retries=0, retry_delay_sec=2 + ): + while True: + try: + return self._read_bytes_from_s3_single( + key, allow_missing=allow_missing, ext_bucket=ext_bucket ) - if obj is None: - return None - return json.loads(obj.decode("utf-8")) - - def put_msgpack(self, obj, key): - self._upload_string_to_s3(msgpack.dumps(obj), key) - - def get_msgpack(self, key, allow_missing=False, num_retries=0, retry_delay_sec=2): - obj = self._read_bytes_from_s3( - key, - allow_missing=allow_missing, - num_retries=num_retries, - retry_delay_sec=retry_delay_sec, - ) - if obj == None: - return None - return msgpack.loads(obj, raw=False) - - def put_pyobj(self, obj, key): - self._upload_string_to_s3(pickle.dumps(obj), key) - - def get_pyobj(self, key, allow_missing=False, num_retries=0, retry_delay_sec=2): - obj = self._read_bytes_from_s3( - key, - allow_missing=allow_missing, - num_retries=num_retries, - retry_delay_sec=retry_delay_sec, - ) - if obj is None: - return None - return pickle.loads(obj) - - def upload_file(self, local_path, key): - self.s3.upload_file(local_path, self.bucket, key) - - def download_file(self, key, local_path): - util.mkdir_p(os.path.dirname(local_path)) - try: - self.s3.download_file(self.bucket, key, local_path) - return local_path - except Exception as e: - raise CortexException( - 'key "{}" in bucket "{}" could not be accessed; '.format(key, self.bucket) - + "it may not exist, or you may not have suffienct permissions" - ) from e - - def zip_and_upload(self, local_path, key): - util.zip_dir(local_path, "temp.zip") - self.s3.upload_file("temp.zip", self.bucket, key) - util.rm_file("temp.zip") - - def download_and_unzip(self, key, local_dir): - util.mkdir_p(local_dir) - local_zip = os.path.join(local_dir, "zip.zip") - self.download_file(key, local_zip) - util.extract_zip(local_zip, delete_zip_file=True) - - def download_and_unzip_external(self, s3_path, local_dir): - util.mkdir_p(local_dir) - local_zip = os.path.join(local_dir, "zip.zip") - self.download_file_external(s3_path, local_zip) - util.extract_zip(local_zip, delete_zip_file=True) - - def download_file_external(self, s3_path, local_path): - util.mkdir_p(os.path.dirname(local_path)) - bucket, key = self.deconstruct_s3_path(s3_path) - try: - self.s3.download_file(bucket, key, local_path) - return local_path - except Exception as e: - raise CortexException( - 'key "{}" in bucket "{}" could not be accessed; '.format(key, bucket) - + "it may not exist, or you may not have suffienct permissions" - ) from e - - def get_json_external(self, s3_path, num_retries=0, retry_delay_sec=2): - bucket, key = self.deconstruct_s3_path(s3_path) - obj = self._read_bytes_from_s3( - key, - allow_missing=False, - ext_bucket=bucket, - num_retries=num_retries, - retry_delay_sec=retry_delay_sec, - ) - if obj is None: - return None - return json.loads(obj.decode("utf-8")) - - def list_objects(self, path): - _, prefix = self.deconstruct_s3_path(path) - return [ - obj[len(prefix) + 1 :] for obj in self.search(prefix=prefix) if not obj.endswith("/") - ] + except: + if num_retries <= 0: + raise + num_retries -= 1 + time.sleep(retry_delay_sec) + + def _read_bytes_from_s3_single(self, key, allow_missing=False, ext_bucket=None): + bucket = self.bucket + if ext_bucket is not None: + bucket = ext_bucket + + try: + try: + byte_array = self.s3.get_object(Bucket=bucket, Key=key)["Body"].read() + except self.s3.exceptions.NoSuchKey: + if allow_missing: + return None + raise + except Exception as e: + raise CortexException( + 'key "{}" in bucket "{}" could not be accessed; '.format(key, bucket) + + "it may not exist, or you may not have suffienct permissions" + ) from e + + return byte_array.strip() + + def search(self, prefix="", suffix=""): + return list(self._get_matching_s3_keys_generator(prefix, suffix)) + + def put_json(self, obj, key): + self._upload_string_to_s3(json.dumps(obj), key) + + def get_json(self, key, allow_missing=False, num_retries=0, retry_delay_sec=2): + obj = self._read_bytes_from_s3( + key, + allow_missing=allow_missing, + num_retries=num_retries, + retry_delay_sec=retry_delay_sec, + ) + if obj is None: + return None + return json.loads(obj.decode("utf-8")) + + def put_msgpack(self, obj, key): + self._upload_string_to_s3(msgpack.dumps(obj), key) + + def get_msgpack(self, key, allow_missing=False, num_retries=0, retry_delay_sec=2): + obj = self._read_bytes_from_s3( + key, + allow_missing=allow_missing, + num_retries=num_retries, + retry_delay_sec=retry_delay_sec, + ) + if obj == None: + return None + return msgpack.loads(obj, raw=False) + + def put_pyobj(self, obj, key): + self._upload_string_to_s3(pickle.dumps(obj), key) + + def get_pyobj(self, key, allow_missing=False, num_retries=0, retry_delay_sec=2): + obj = self._read_bytes_from_s3( + key, + allow_missing=allow_missing, + num_retries=num_retries, + retry_delay_sec=retry_delay_sec, + ) + if obj is None: + return None + return pickle.loads(obj) + + def upload_file(self, local_path, key): + self.s3.upload_file(local_path, self.bucket, key) + + def download_file(self, key, local_path): + util.mkdir_p(os.path.dirname(local_path)) + try: + self.s3.download_file(self.bucket, key, local_path) + return local_path + except Exception as e: + raise CortexException( + 'key "{}" in bucket "{}" could not be accessed; '.format(key, self.bucket) + + "it may not exist, or you may not have suffienct permissions" + ) from e + + def zip_and_upload(self, local_path, key): + util.zip_dir(local_path, "temp.zip") + self.s3.upload_file("temp.zip", self.bucket, key) + util.rm_file("temp.zip") + + def download_and_unzip(self, key, local_dir): + util.mkdir_p(local_dir) + local_zip = os.path.join(local_dir, "zip.zip") + self.download_file(key, local_zip) + util.extract_zip(local_zip, delete_zip_file=True) + + def download_and_unzip_external(self, s3_path, local_dir): + util.mkdir_p(local_dir) + local_zip = os.path.join(local_dir, "zip.zip") + self.download_file_external(s3_path, local_zip) + util.extract_zip(local_zip, delete_zip_file=True) + + def download_file_external(self, s3_path, local_path): + util.mkdir_p(os.path.dirname(local_path)) + bucket, key = self.deconstruct_s3_path(s3_path) + try: + self.s3.download_file(bucket, key, local_path) + return local_path + except Exception as e: + raise CortexException( + 'key "{}" in bucket "{}" could not be accessed; '.format(key, bucket) + + "it may not exist, or you may not have suffienct permissions" + ) from e + + def get_json_external(self, s3_path, num_retries=0, retry_delay_sec=2): + bucket, key = self.deconstruct_s3_path(s3_path) + obj = self._read_bytes_from_s3( + key, + allow_missing=False, + ext_bucket=bucket, + num_retries=num_retries, + retry_delay_sec=retry_delay_sec, + ) + if obj is None: + return None + return json.loads(obj.decode("utf-8")) diff --git a/pkg/workloads/cortex/tf_api/api.py b/pkg/workloads/cortex/tf_api/api.py index 35e0bf59d7..c5d63408f8 100644 --- a/pkg/workloads/cortex/tf_api/api.py +++ b/pkg/workloads/cortex/tf_api/api.py @@ -446,8 +446,8 @@ def download_dir_external(ctx, s3_path, local_path): util.mkdir_p(local_path) bucket_name, prefix = ctx.storage.deconstruct_s3_path(s3_path) storage_client = S3(bucket_name, client_config={}) - objects = storage_client.list_objects(s3_path) - version = prefix.split("/")[-1] + objects = [obj[len(prefix) + 1:] for obj in storage_client.search(prefix=prefix)] + version = prefix.rstrip("/").split("/")[-1] local_path = os.path.join(local_path, version) for obj in objects: if not os.path.exists(os.path.dirname(obj)): @@ -509,7 +509,7 @@ def start(args): if api.get("request_handler") is not None: local_cache["request_handler"] = ctx.get_request_handler_impl(api["name"]) - + if not os.path.isdir(args.model_dir): if util.is_resource_ref(api["model"]): model_name = util.get_resource_ref(api["model"]) @@ -518,9 +518,9 @@ def start(args): else: download_dir_external(ctx, api["model"], args.model_dir) - if args.only_download: - return - + if args.only_download: + return + if util.is_resource_ref(api["model"]): model_name = util.get_resource_ref(api["model"]) model = ctx.models[model_name] From 252bb4074166d07640d05ab5b9ad24796a96a00e Mon Sep 17 00:00:00 2001 From: ivan Date: Mon, 19 Aug 2019 00:37:30 -0400 Subject: [PATCH 25/26] format --- pkg/workloads/cortex/lib/storage/s3.py | 444 ++++++++++++------------- pkg/workloads/cortex/tf_api/api.py | 6 +- 2 files changed, 225 insertions(+), 225 deletions(-) diff --git a/pkg/workloads/cortex/lib/storage/s3.py b/pkg/workloads/cortex/lib/storage/s3.py index a738109a81..dcaa612008 100644 --- a/pkg/workloads/cortex/lib/storage/s3.py +++ b/pkg/workloads/cortex/lib/storage/s3.py @@ -26,226 +26,226 @@ class S3(object): - def __init__(self, bucket=None, region=None, client_config={}): - self.bucket = bucket - self.region = region - - default_config = { - "use_ssl": True, - "verify": True, - "region_name": None, - "aws_access_key_id": None, - "aws_secret_access_key": None, - "aws_session_token": None, - } - - if client_config is None: - client_config = {} - - if region is not None: - client_config["region_name"] = region - - merged_client_config = util.merge_dicts_in_place_no_overwrite(client_config, default_config) - - self.s3 = boto3.client("s3", **client_config) - - @staticmethod - def deconstruct_s3_path(s3_path): - path = util.remove_prefix_if_present(s3_path, "s3://") - bucket = path.split("/")[0] - key = os.path.join(*path.split("/")[1:]) - return (bucket, key) - - def hadoop_path(self, key): - return os.path.join("s3a://", self.bucket, key) - - def blob_path(self, key): - return os.path.join("s3://", self.bucket, key) - - def _get_dir(self, prefix, local_dir): - prefix = util.add_suffix_unless_present(prefix, "/") - util.mkdir_p(local_dir) - for key in self._get_matching_s3_keys_generator(prefix): - rel_path = util.remove_prefix_if_present(key, prefix) - local_dest_path = os.path.join(local_dir, rel_path) - self.download_file(key, local_dest_path) - - def _file_exists(self, key): - try: - self.s3.head_object(Bucket=self.bucket, Key=key) - return True - except botocore.exceptions.ClientError as e: - if e.response["Error"]["Code"] == "404": - return False - else: - raise - - def _is_s3_prefix(self, prefix): - response = self.s3.list_objects_v2(Bucket=self.bucket, Prefix=prefix) - return response["KeyCount"] > 0 - - def _is_s3_dir(self, dir_path): - prefix = util.add_suffix_unless_present(dir_path, "/") - return self._is_s3_prefix(prefix) - - def _get_matching_s3_objects_generator(self, prefix="", suffix=""): - kwargs = {"Bucket": self.bucket, "Prefix": prefix} - - while True: - resp = self.s3.list_objects_v2(**kwargs) - try: - contents = resp["Contents"] - except KeyError: - return - - for obj in contents: - key = obj["Key"] - if key.startswith(prefix) and key.endswith(suffix): - yield obj - - try: - kwargs["ContinuationToken"] = resp["NextContinuationToken"] - except KeyError: - break - - def _get_matching_s3_keys_generator(self, prefix="", suffix=""): - for obj in self._get_matching_s3_objects_generator(prefix, suffix): - yield obj["Key"] - - def _upload_string_to_s3(self, string, key): - self.s3.put_object(Bucket=self.bucket, Key=key, Body=string) - - def _read_bytes_from_s3( - self, key, allow_missing=False, ext_bucket=None, num_retries=0, retry_delay_sec=2 - ): - while True: - try: - return self._read_bytes_from_s3_single( - key, allow_missing=allow_missing, ext_bucket=ext_bucket + def __init__(self, bucket=None, region=None, client_config={}): + self.bucket = bucket + self.region = region + + default_config = { + "use_ssl": True, + "verify": True, + "region_name": None, + "aws_access_key_id": None, + "aws_secret_access_key": None, + "aws_session_token": None, + } + + if client_config is None: + client_config = {} + + if region is not None: + client_config["region_name"] = region + + merged_client_config = util.merge_dicts_in_place_no_overwrite(client_config, default_config) + + self.s3 = boto3.client("s3", **client_config) + + @staticmethod + def deconstruct_s3_path(s3_path): + path = util.remove_prefix_if_present(s3_path, "s3://") + bucket = path.split("/")[0] + key = os.path.join(*path.split("/")[1:]) + return (bucket, key) + + def hadoop_path(self, key): + return os.path.join("s3a://", self.bucket, key) + + def blob_path(self, key): + return os.path.join("s3://", self.bucket, key) + + def _get_dir(self, prefix, local_dir): + prefix = util.add_suffix_unless_present(prefix, "/") + util.mkdir_p(local_dir) + for key in self._get_matching_s3_keys_generator(prefix): + rel_path = util.remove_prefix_if_present(key, prefix) + local_dest_path = os.path.join(local_dir, rel_path) + self.download_file(key, local_dest_path) + + def _file_exists(self, key): + try: + self.s3.head_object(Bucket=self.bucket, Key=key) + return True + except botocore.exceptions.ClientError as e: + if e.response["Error"]["Code"] == "404": + return False + else: + raise + + def _is_s3_prefix(self, prefix): + response = self.s3.list_objects_v2(Bucket=self.bucket, Prefix=prefix) + return response["KeyCount"] > 0 + + def _is_s3_dir(self, dir_path): + prefix = util.add_suffix_unless_present(dir_path, "/") + return self._is_s3_prefix(prefix) + + def _get_matching_s3_objects_generator(self, prefix="", suffix=""): + kwargs = {"Bucket": self.bucket, "Prefix": prefix} + + while True: + resp = self.s3.list_objects_v2(**kwargs) + try: + contents = resp["Contents"] + except KeyError: + return + + for obj in contents: + key = obj["Key"] + if key.startswith(prefix) and key.endswith(suffix): + yield obj + + try: + kwargs["ContinuationToken"] = resp["NextContinuationToken"] + except KeyError: + break + + def _get_matching_s3_keys_generator(self, prefix="", suffix=""): + for obj in self._get_matching_s3_objects_generator(prefix, suffix): + yield obj["Key"] + + def _upload_string_to_s3(self, string, key): + self.s3.put_object(Bucket=self.bucket, Key=key, Body=string) + + def _read_bytes_from_s3( + self, key, allow_missing=False, ext_bucket=None, num_retries=0, retry_delay_sec=2 + ): + while True: + try: + return self._read_bytes_from_s3_single( + key, allow_missing=allow_missing, ext_bucket=ext_bucket + ) + except: + if num_retries <= 0: + raise + num_retries -= 1 + time.sleep(retry_delay_sec) + + def _read_bytes_from_s3_single(self, key, allow_missing=False, ext_bucket=None): + bucket = self.bucket + if ext_bucket is not None: + bucket = ext_bucket + + try: + try: + byte_array = self.s3.get_object(Bucket=bucket, Key=key)["Body"].read() + except self.s3.exceptions.NoSuchKey: + if allow_missing: + return None + raise + except Exception as e: + raise CortexException( + 'key "{}" in bucket "{}" could not be accessed; '.format(key, bucket) + + "it may not exist, or you may not have suffienct permissions" + ) from e + + return byte_array.strip() + + def search(self, prefix="", suffix=""): + return list(self._get_matching_s3_keys_generator(prefix, suffix)) + + def put_json(self, obj, key): + self._upload_string_to_s3(json.dumps(obj), key) + + def get_json(self, key, allow_missing=False, num_retries=0, retry_delay_sec=2): + obj = self._read_bytes_from_s3( + key, + allow_missing=allow_missing, + num_retries=num_retries, + retry_delay_sec=retry_delay_sec, ) - except: - if num_retries <= 0: - raise - num_retries -= 1 - time.sleep(retry_delay_sec) - - def _read_bytes_from_s3_single(self, key, allow_missing=False, ext_bucket=None): - bucket = self.bucket - if ext_bucket is not None: - bucket = ext_bucket - - try: - try: - byte_array = self.s3.get_object(Bucket=bucket, Key=key)["Body"].read() - except self.s3.exceptions.NoSuchKey: - if allow_missing: - return None - raise - except Exception as e: - raise CortexException( - 'key "{}" in bucket "{}" could not be accessed; '.format(key, bucket) - + "it may not exist, or you may not have suffienct permissions" - ) from e - - return byte_array.strip() - - def search(self, prefix="", suffix=""): - return list(self._get_matching_s3_keys_generator(prefix, suffix)) - - def put_json(self, obj, key): - self._upload_string_to_s3(json.dumps(obj), key) - - def get_json(self, key, allow_missing=False, num_retries=0, retry_delay_sec=2): - obj = self._read_bytes_from_s3( - key, - allow_missing=allow_missing, - num_retries=num_retries, - retry_delay_sec=retry_delay_sec, - ) - if obj is None: - return None - return json.loads(obj.decode("utf-8")) - - def put_msgpack(self, obj, key): - self._upload_string_to_s3(msgpack.dumps(obj), key) - - def get_msgpack(self, key, allow_missing=False, num_retries=0, retry_delay_sec=2): - obj = self._read_bytes_from_s3( - key, - allow_missing=allow_missing, - num_retries=num_retries, - retry_delay_sec=retry_delay_sec, - ) - if obj == None: - return None - return msgpack.loads(obj, raw=False) - - def put_pyobj(self, obj, key): - self._upload_string_to_s3(pickle.dumps(obj), key) - - def get_pyobj(self, key, allow_missing=False, num_retries=0, retry_delay_sec=2): - obj = self._read_bytes_from_s3( - key, - allow_missing=allow_missing, - num_retries=num_retries, - retry_delay_sec=retry_delay_sec, - ) - if obj is None: - return None - return pickle.loads(obj) - - def upload_file(self, local_path, key): - self.s3.upload_file(local_path, self.bucket, key) - - def download_file(self, key, local_path): - util.mkdir_p(os.path.dirname(local_path)) - try: - self.s3.download_file(self.bucket, key, local_path) - return local_path - except Exception as e: - raise CortexException( - 'key "{}" in bucket "{}" could not be accessed; '.format(key, self.bucket) - + "it may not exist, or you may not have suffienct permissions" - ) from e - - def zip_and_upload(self, local_path, key): - util.zip_dir(local_path, "temp.zip") - self.s3.upload_file("temp.zip", self.bucket, key) - util.rm_file("temp.zip") - - def download_and_unzip(self, key, local_dir): - util.mkdir_p(local_dir) - local_zip = os.path.join(local_dir, "zip.zip") - self.download_file(key, local_zip) - util.extract_zip(local_zip, delete_zip_file=True) - - def download_and_unzip_external(self, s3_path, local_dir): - util.mkdir_p(local_dir) - local_zip = os.path.join(local_dir, "zip.zip") - self.download_file_external(s3_path, local_zip) - util.extract_zip(local_zip, delete_zip_file=True) - - def download_file_external(self, s3_path, local_path): - util.mkdir_p(os.path.dirname(local_path)) - bucket, key = self.deconstruct_s3_path(s3_path) - try: - self.s3.download_file(bucket, key, local_path) - return local_path - except Exception as e: - raise CortexException( - 'key "{}" in bucket "{}" could not be accessed; '.format(key, bucket) - + "it may not exist, or you may not have suffienct permissions" - ) from e - - def get_json_external(self, s3_path, num_retries=0, retry_delay_sec=2): - bucket, key = self.deconstruct_s3_path(s3_path) - obj = self._read_bytes_from_s3( - key, - allow_missing=False, - ext_bucket=bucket, - num_retries=num_retries, - retry_delay_sec=retry_delay_sec, - ) - if obj is None: - return None - return json.loads(obj.decode("utf-8")) + if obj is None: + return None + return json.loads(obj.decode("utf-8")) + + def put_msgpack(self, obj, key): + self._upload_string_to_s3(msgpack.dumps(obj), key) + + def get_msgpack(self, key, allow_missing=False, num_retries=0, retry_delay_sec=2): + obj = self._read_bytes_from_s3( + key, + allow_missing=allow_missing, + num_retries=num_retries, + retry_delay_sec=retry_delay_sec, + ) + if obj == None: + return None + return msgpack.loads(obj, raw=False) + + def put_pyobj(self, obj, key): + self._upload_string_to_s3(pickle.dumps(obj), key) + + def get_pyobj(self, key, allow_missing=False, num_retries=0, retry_delay_sec=2): + obj = self._read_bytes_from_s3( + key, + allow_missing=allow_missing, + num_retries=num_retries, + retry_delay_sec=retry_delay_sec, + ) + if obj is None: + return None + return pickle.loads(obj) + + def upload_file(self, local_path, key): + self.s3.upload_file(local_path, self.bucket, key) + + def download_file(self, key, local_path): + util.mkdir_p(os.path.dirname(local_path)) + try: + self.s3.download_file(self.bucket, key, local_path) + return local_path + except Exception as e: + raise CortexException( + 'key "{}" in bucket "{}" could not be accessed; '.format(key, self.bucket) + + "it may not exist, or you may not have suffienct permissions" + ) from e + + def zip_and_upload(self, local_path, key): + util.zip_dir(local_path, "temp.zip") + self.s3.upload_file("temp.zip", self.bucket, key) + util.rm_file("temp.zip") + + def download_and_unzip(self, key, local_dir): + util.mkdir_p(local_dir) + local_zip = os.path.join(local_dir, "zip.zip") + self.download_file(key, local_zip) + util.extract_zip(local_zip, delete_zip_file=True) + + def download_and_unzip_external(self, s3_path, local_dir): + util.mkdir_p(local_dir) + local_zip = os.path.join(local_dir, "zip.zip") + self.download_file_external(s3_path, local_zip) + util.extract_zip(local_zip, delete_zip_file=True) + + def download_file_external(self, s3_path, local_path): + util.mkdir_p(os.path.dirname(local_path)) + bucket, key = self.deconstruct_s3_path(s3_path) + try: + self.s3.download_file(bucket, key, local_path) + return local_path + except Exception as e: + raise CortexException( + 'key "{}" in bucket "{}" could not be accessed; '.format(key, bucket) + + "it may not exist, or you may not have suffienct permissions" + ) from e + + def get_json_external(self, s3_path, num_retries=0, retry_delay_sec=2): + bucket, key = self.deconstruct_s3_path(s3_path) + obj = self._read_bytes_from_s3( + key, + allow_missing=False, + ext_bucket=bucket, + num_retries=num_retries, + retry_delay_sec=retry_delay_sec, + ) + if obj is None: + return None + return json.loads(obj.decode("utf-8")) diff --git a/pkg/workloads/cortex/tf_api/api.py b/pkg/workloads/cortex/tf_api/api.py index 244809e006..f33a89ee5e 100644 --- a/pkg/workloads/cortex/tf_api/api.py +++ b/pkg/workloads/cortex/tf_api/api.py @@ -442,7 +442,7 @@ def download_dir_external(ctx, s3_path, local_path): util.mkdir_p(local_path) bucket_name, prefix = ctx.storage.deconstruct_s3_path(s3_path) storage_client = S3(bucket_name, client_config={}) - objects = [obj[len(prefix) + 1:] for obj in storage_client.search(prefix=prefix)] + objects = [obj[len(prefix) + 1 :] for obj in storage_client.search(prefix=prefix)] version = prefix.rstrip("/").split("/")[-1] local_path = os.path.join(local_path, version) for obj in objects: @@ -505,7 +505,7 @@ def start(args): if api.get("request_handler") is not None: local_cache["request_handler"] = ctx.get_request_handler_impl(api["name"]) - + if not os.path.isdir(args.model_dir): if util.is_resource_ref(api["model"]): model_name = util.get_resource_ref(api["model"]) @@ -516,7 +516,7 @@ def start(args): if args.only_download: return - + if util.is_resource_ref(api["model"]): model_name = util.get_resource_ref(api["model"]) model = ctx.models[model_name] From 2b27b885a41fd8b7a5aa186f17e1c8ef43f110e4 Mon Sep 17 00:00:00 2001 From: ivan Date: Mon, 19 Aug 2019 09:38:50 -0400 Subject: [PATCH 26/26] address comment --- pkg/workloads/cortex/tf_api/api.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/workloads/cortex/tf_api/api.py b/pkg/workloads/cortex/tf_api/api.py index f33a89ee5e..23edf4e915 100644 --- a/pkg/workloads/cortex/tf_api/api.py +++ b/pkg/workloads/cortex/tf_api/api.py @@ -443,7 +443,8 @@ def download_dir_external(ctx, s3_path, local_path): bucket_name, prefix = ctx.storage.deconstruct_s3_path(s3_path) storage_client = S3(bucket_name, client_config={}) objects = [obj[len(prefix) + 1 :] for obj in storage_client.search(prefix=prefix)] - version = prefix.rstrip("/").split("/")[-1] + prefix = prefix + "/" if prefix[-1] != "/" else prefix + version = prefix.split("/")[-2] local_path = os.path.join(local_path, version) for obj in objects: if not os.path.exists(os.path.dirname(obj)):