diff --git a/docs/deployments/packaging-models.md b/docs/deployments/packaging-models.md index f210f9913c..61f590231c 100644 --- a/docs/deployments/packaging-models.md +++ b/docs/deployments/packaging-models.md @@ -20,15 +20,12 @@ train_spec = tf.estimator.TrainSpec(train_input_fn, max_steps=1000) eval_spec = tf.estimator.EvalSpec(eval_input_fn, exporters=[exporter], name="estimator-eval") tf.estimator.train_and_evaluate(classifier, train_spec, eval_spec) - -# zip the estimator export dir (the exported path looks like iris/export/estimator/1562353043/) -shutil.make_archive("tensorflow", "zip", os.path.join("iris/export/estimator")) ``` -Upload the zipped file to Amazon S3 using the AWS web console or CLI: +Upload the exported version directory to Amazon S3 using the AWS web console or CLI: ```text -$ aws s3 cp model.zip s3://my-bucket/model.zip +$ aws s3 sync ./iris/export/estimator/156293432 s3://my-bucket/iris/156293432 ``` Reference your model in an `api`: @@ -36,7 +33,7 @@ Reference your model in an `api`: ```yaml - kind: api name: my-api - model: s3://my-bucket/model.zip + model: s3://my-bucket/iris/156293432 ``` ## ONNX diff --git a/examples/iris/cortex.yaml b/examples/iris/cortex.yaml index ba286dbaff..236a3579a0 100644 --- a/examples/iris/cortex.yaml +++ b/examples/iris/cortex.yaml @@ -3,7 +3,7 @@ - kind: api name: tensorflow - model: s3://cortex-examples/iris/tensorflow.zip + model: s3://cortex-examples/iris/tensorflow/1560263532 request_handler: handlers/tensorflow.py - kind: api diff --git a/examples/iris/models/tensorflow_model.py b/examples/iris/models/tensorflow_model.py index 5e197be9bf..87e6d654dc 100644 --- a/examples/iris/models/tensorflow_model.py +++ b/examples/iris/models/tensorflow_model.py @@ -88,7 +88,3 @@ def my_model(features, labels, mode, params): # zip the estimator export dir (the exported path looks like iris_tf_export/export/estimator/1562353043/) estimator_dir = EXPORT_DIR + "/export/estimator" -shutil.make_archive("tensorflow", "zip", os.path.join(estimator_dir)) - -# clean up -shutil.rmtree(EXPORT_DIR) diff --git a/examples/sentiment/cortex.yaml b/examples/sentiment/cortex.yaml index 1c638c657b..431d0aee88 100644 --- a/examples/sentiment/cortex.yaml +++ b/examples/sentiment/cortex.yaml @@ -3,5 +3,5 @@ - kind: api name: classifier - model: s3://cortex-examples/sentiment/bert.zip + model: s3://cortex-examples/sentiment/1565392692 request_handler: sentiment.py diff --git a/pkg/lib/aws/s3.go b/pkg/lib/aws/s3.go index 93b750aa5e..fd15da4db9 100644 --- a/pkg/lib/aws/s3.go +++ b/pkg/lib/aws/s3.go @@ -318,7 +318,7 @@ func IsS3PrefixExternal(bucket string, prefix string) (bool, error) { return hasPrefix, nil } -func IsS3FileExternal(bucket string, key string) (bool, error) { +func IsS3FileExternal(bucket string, keys ...string) (bool, error) { region, err := GetBucketRegion(bucket) if err != nil { return false, err @@ -328,17 +328,19 @@ func IsS3FileExternal(bucket string, key string) (bool, error) { Region: aws.String(region), })) - _, err = s3.New(sess).HeadObject(&s3.HeadObjectInput{ - Bucket: aws.String(bucket), - Key: aws.String(key), - }) + for _, key := range keys { + _, err = s3.New(sess).HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) - if IsNotFoundErr(err) { - return false, nil - } + if IsNotFoundErr(err) { + return false, nil + } - if err != nil { - return false, errors.Wrap(err, bucket, key) + if err != nil { + return false, errors.Wrap(err, bucket, key) + } } return true, nil @@ -352,11 +354,29 @@ func IsS3aPathPrefixExternal(s3aPath string) (bool, error) { return IsS3PrefixExternal(bucket, prefix) } -func IsS3PathFileExternal(s3Path string) (bool, error) { - bucket, key, err := SplitS3Path(s3Path) +func IsS3PathPrefixExternal(s3Path string) (bool, error) { + bucket, prefix, err := SplitS3Path(s3Path) if err != nil { return false, err } + return IsS3PrefixExternal(bucket, prefix) +} + +func IsS3PathFileExternal(s3Paths ...string) (bool, error) { + for _, s3Path := range s3Paths { + bucket, key, err := SplitS3Path(s3Path) + if err != nil { + return false, err + } + exists, err := IsS3FileExternal(bucket, key) + if err != nil { + return false, err + } - return IsS3FileExternal(bucket, key) + if !exists { + return false, nil + } + } + + return true, nil } diff --git a/pkg/operator/api/userconfig/apis.go b/pkg/operator/api/userconfig/apis.go index a2fc5564f5..af0ca915c1 100644 --- a/pkg/operator/api/userconfig/apis.go +++ b/pkg/operator/api/userconfig/apis.go @@ -77,6 +77,29 @@ var apiValidation = &cr.StructValidation{ }, } +// IsValidS3Directory checks that the path contains a valid S3 directory for Tensorflow models +// Must contain the following structure: +// - 1523423423/ (version prefix, usually a timestamp) +// - saved_model.pb +// - variables/ +// - variables.index +// - variables.data-00000-of-00001 (there are a variable number of these files) +func IsValidS3Directory(path string) bool { + if valid, err := aws.IsS3PathFileExternal( + fmt.Sprintf("%s/saved_model.pb", path), + fmt.Sprintf("%s/variables/variables.index", path), + ); err != nil || !valid { + return false + } + + if valid, err := aws.IsS3PathPrefixExternal( + fmt.Sprintf("%s/variables/variables.data-00000-of", path), + ); err != nil || !valid { + return false + } + return true +} + func (api *API) UserConfigStr() string { var sb strings.Builder sb.WriteString(api.ResourceFields.UserConfigStr()) @@ -118,24 +141,39 @@ func (apis APIs) Validate() error { func (api *API) Validate() error { if yaml.StartsWithEscapedAtSymbol(api.Model) { api.ModelFormat = TensorFlowModelFormat - } else { - if !aws.IsValidS3Path(api.Model) { - return errors.Wrap(ErrorInvalidS3PathOrResourceReference(api.Model), Identify(api), ModelKey) + if err := api.Compute.Validate(); err != nil { + return errors.Wrap(err, Identify(api), ComputeKey) } - if api.ModelFormat == UnknownModelFormat { - if strings.HasSuffix(api.Model, ".onnx") { - api.ModelFormat = ONNXModelFormat - } else if strings.HasSuffix(api.Model, ".zip") { - api.ModelFormat = TensorFlowModelFormat - } else { - return errors.Wrap(ErrorUnableToInferModelFormat(), Identify(api)) - } - } + return nil + } + + if !aws.IsValidS3Path(api.Model) { + return errors.Wrap(ErrorInvalidS3PathOrResourceReference(api.Model), Identify(api), ModelKey) + } + switch api.ModelFormat { + case ONNXModelFormat: if ok, err := aws.IsS3PathFileExternal(api.Model); err != nil || !ok { return errors.Wrap(ErrorExternalNotFound(api.Model), Identify(api), ModelKey) } + case TensorFlowModelFormat: + if !IsValidS3Directory(api.Model) { + return errors.Wrap(ErrorInvalidTensorflowDir(api.Model), Identify(api), ModelKey) + } + default: + switch { + case strings.HasSuffix(api.Model, ".onnx"): + api.ModelFormat = ONNXModelFormat + if ok, err := aws.IsS3PathFileExternal(api.Model); err != nil || !ok { + return errors.Wrap(ErrorExternalNotFound(api.Model), Identify(api), ModelKey) + } + case IsValidS3Directory(api.Model): + api.ModelFormat = TensorFlowModelFormat + default: + return errors.Wrap(ErrorUnableToInferModelFormat(), Identify(api)) + } + } if err := api.Compute.Validate(); err != nil { diff --git a/pkg/operator/api/userconfig/config.go b/pkg/operator/api/userconfig/config.go index 55a3d4e408..146fa6afa8 100644 --- a/pkg/operator/api/userconfig/config.go +++ b/pkg/operator/api/userconfig/config.go @@ -154,11 +154,6 @@ func (config *Config) ValidatePartial() error { } func (config *Config) Validate(envName string) error { - err := config.ValidatePartial() - if err != nil { - return err - } - if config.App == nil { return ErrorMissingAppDefinition() } diff --git a/pkg/operator/api/userconfig/errors.go b/pkg/operator/api/userconfig/errors.go index 62ff9cad25..3e5f20821f 100644 --- a/pkg/operator/api/userconfig/errors.go +++ b/pkg/operator/api/userconfig/errors.go @@ -80,6 +80,7 @@ const ( ErrInvalidS3PathOrResourceReference ErrUnableToInferModelFormat ErrExternalNotFound + ErrInvalidTensorflowDir ) var errorKinds = []string{ @@ -133,9 +134,10 @@ var errorKinds = []string{ "err_invalid_s3_path_or_resource_reference", "err_unable_to_infer_model_format", "err_external_not_found", + "err_invalid_tensorflow_dir", } -var _ = [1]int{}[int(ErrExternalNotFound)-(len(errorKinds)-1)] // Ensure list length matches +var _ = [1]int{}[int(ErrInvalidTensorflowDir)-(len(errorKinds)-1)] // Ensure list length matches func (t ErrorKind) String() string { return errorKinds[t] @@ -599,10 +601,30 @@ func ErrorExternalNotFound(path string) error { } } +var onnxExpectedStructMessage = `For ONNX models, the path should end in .onnx` + +var tfExpectedStructMessage = `For TensorFlow models, the path should be a directory with the following structure: + 1523423423/ (version prefix, usually a timestamp) + ├── saved_model.pb + └── variables/ + ├── variables.index + ├── variables.data-00000-of-00003 + ├── variables.data-00001-of-00003 + └── variables.data-00002-of-...` + func ErrorUnableToInferModelFormat() error { + message := ModelFormatKey + " not specified, and could not be inferred\n" + onnxExpectedStructMessage + "\n" + tfExpectedStructMessage return Error{ Kind: ErrUnableToInferModelFormat, - message: "unable to infer " + ModelFormatKey + ": path to model should end in .zip for TensorFlow models, .onnx for ONNX models, or the " + ModelFormatKey + " key must be specified", + message: message, + } +} +func ErrorInvalidTensorflowDir(path string) error { + message := "invalid TF export directory.\n" + message += tfExpectedStructMessage + return Error{ + Kind: ErrInvalidTensorflowDir, + message: message, } } diff --git a/pkg/workloads/cortex/tf_api/api.py b/pkg/workloads/cortex/tf_api/api.py index 26c89ba7dd..23edf4e915 100644 --- a/pkg/workloads/cortex/tf_api/api.py +++ b/pkg/workloads/cortex/tf_api/api.py @@ -31,6 +31,7 @@ from cortex import consts from cortex.lib import util, tf_lib, package, Context from cortex.lib.log import get_logger +from cortex.lib.storage import S3, LocalStorage from cortex.lib.exceptions import CortexException, UserRuntimeException, UserException from cortex.lib.context import create_transformer_inputs_from_map @@ -437,6 +438,23 @@ def get_signature(app_name, api_name): return jsonify(response) +def download_dir_external(ctx, s3_path, local_path): + util.mkdir_p(local_path) + bucket_name, prefix = ctx.storage.deconstruct_s3_path(s3_path) + storage_client = S3(bucket_name, client_config={}) + objects = [obj[len(prefix) + 1 :] for obj in storage_client.search(prefix=prefix)] + prefix = prefix + "/" if prefix[-1] != "/" else prefix + version = prefix.split("/")[-2] + local_path = os.path.join(local_path, version) + for obj in objects: + if not os.path.exists(os.path.dirname(obj)): + util.mkdir_p(os.path.join(local_path, os.path.dirname(obj))) + + ctx.storage.download_file_external( + bucket_name + "/" + os.path.join(prefix, obj), os.path.join(local_path, obj) + ) + + def validate_model_dir(model_dir): """ validates that model_dir has the expected directory tree. @@ -489,23 +507,22 @@ def start(args): if api.get("request_handler") is not None: local_cache["request_handler"] = ctx.get_request_handler_impl(api["name"]) - if not util.is_resource_ref(api["model"]): - if not os.path.isdir(args.model_dir): - ctx.storage.download_and_unzip_external(api["model"], args.model_dir) + if not os.path.isdir(args.model_dir): + if util.is_resource_ref(api["model"]): + model_name = util.get_resource_ref(api["model"]) + model = ctx.models[model_name] + ctx.storage.download_and_unzip(model["key"], args.model_dir) + else: + download_dir_external(ctx, api["model"], args.model_dir) - if args.only_download: - return - else: + if args.only_download: + return + + if util.is_resource_ref(api["model"]): model_name = util.get_resource_ref(api["model"]) model = ctx.models[model_name] estimator = ctx.estimators[model["estimator"]] - if not os.path.isdir(args.model_dir): - ctx.storage.download_and_unzip(model["key"], args.model_dir) - - if args.only_download: - return - local_cache["model"] = model local_cache["estimator"] = estimator local_cache["target_col"] = ctx.columns[util.get_resource_ref(model["target_column"])]