Skip to content

External constants support #163

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/applications/advanced/external-models.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ $ aws s3 cp model.zip s3://your-bucket/model.zip
- kind: api
name: my-api
external_model:
path: s3://your-bucket/model.zip
path: s3://my-bucket/model.zip
region: us-west-2
compute:
replicas: 5
Expand Down
4 changes: 2 additions & 2 deletions docs/applications/resources/apis.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ Serve models at scale and use them to build smarter applications.
name: <string> # API name (required)
model: <string> # reference to a Cortex model (this or external_model must be specified)
external_model: # (this or model must be specified)
path: <string> # path to a zipped model dir
region: <string> # S3 region (default: us-west-2)
path: <string> # path to a zipped model dir (e.g. s3://my-bucket/model.zip)
region: <string> # S3 region (default: us-west-2)
compute:
replicas: <int> # number of replicas to launch (default: 1)
cpu: <string> # CPU request (default: Null)
Expand Down
6 changes: 5 additions & 1 deletion docs/applications/resources/constants.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ Constants represent literal values which can be used in other Cortex resources.
- kind: constant
name: <string> # constant name (required)
type: <output_type> # the type of the constant (optional, will be inferred from value if not specified)
value: <output_value> # a literal value (required)
value: <output_value> # a literal value (this or external_model must be specified)
external_model: # (this or value must be specified)
path: <string> # path to a JSON object (e.g. s3://my-bucket/constant.json)
region: <string> # S3 region (default: us-west-2)

```

See [Data Types](data-types.md) for details about output types and values.
Expand Down
4 changes: 2 additions & 2 deletions docs/applications/resources/environments.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Transfer data at scale from data warehouses like S3 into the Cortex environment.
data:
type: csv # file type (required)
path: s3a://<bucket_name>/<file_name> # S3 is currently supported (required)
region: us-west-2 # S3 region (default: us-west-2)
region: us-west-2 # S3 region (default: us-west-2)
drop_null: <bool> # drop any rows that contain at least 1 null value (default: false)
csv_config: <csv_config> # optional configuration that can be provided
schema:
Expand Down Expand Up @@ -65,7 +65,7 @@ csv_config:
data:
type: parquet # file type (required)
path: s3a://<bucket_name>/<file_name> # S3 is currently supported (required)
region: us-west-2 # S3 region (default: us-west-2)
region: us-west-2 # S3 region (default: us-west-2)
drop_null: <bool> # drop any rows that contain at least 1 null value (default: false)
schema:
- parquet_column_name: <string> # name of the column in the parquet file (required)
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/api/userconfig/apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (api *API) Validate() error {
}

if ok, err := aws.IsS3FileExternal(bucket, key, api.ExternalModel.Region); err != nil || !ok {
return errors.Wrap(ErrorExternalModelNotFound(api.ExternalModel.Path), Identify(api), ExternalModelKey, PathKey)
return errors.Wrap(ErrorExternalNotFound(api.ExternalModel.Path), Identify(api), ExternalModelKey, PathKey)
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/operator/api/userconfig/config_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
PathKey = "path"
OutputTypeKey = "output_type"
TagsKey = "tags"
ExternalKey = "external"

// input schema options
OptionalOptKey = "_optional"
Expand Down
58 changes: 54 additions & 4 deletions pkg/operator/api/userconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package userconfig

import (
"github.com/cortexlabs/cortex/pkg/lib/aws"
cr "github.com/cortexlabs/cortex/pkg/lib/configreader"
"github.com/cortexlabs/cortex/pkg/lib/errors"
"github.com/cortexlabs/cortex/pkg/operator/api/resource"
Expand All @@ -26,9 +27,10 @@ type Constants []*Constant

type Constant struct {
ResourceFields
Type OutputSchema `json:"type" yaml:"type"`
Value interface{} `json:"value" yaml:"value"`
Tags Tags `json:"tags" yaml:"tags"`
Type OutputSchema `json:"type" yaml:"type"`
Value interface{} `json:"value" yaml:"value"`
Tags Tags `json:"tags" yaml:"tags"`
External *ExternalConstant `json:"external" yaml:"external"`
}

var constantValidation = &cr.StructValidation{
Expand All @@ -50,14 +52,43 @@ var constantValidation = &cr.StructValidation{
{
StructField: "Value",
InterfaceValidation: &cr.InterfaceValidation{
Required: true,
Required: false,
},
},
{
StructField: "External",
StructValidation: externalModelFieldValidation,
},
tagsFieldValidation,
typeFieldValidation,
},
}

type ExternalConstant struct {
Path string `json:"path" yaml:"path"`
Region string `json:"region" yaml:"region"`
}

var externalConstantFieldValidation = &cr.StructValidation{
DefaultNil: true,
StructFieldValidations: []*cr.StructFieldValidation{
{
StructField: "Path",
StringValidation: &cr.StringValidation{
Validator: cr.GetS3PathValidator(),
Required: true,
},
},
{
StructField: "Region",
StringValidation: &cr.StringValidation{
Default: aws.DefaultS3Region,
AllowedValues: aws.S3Regions.Slice(),
},
},
},
}

func (constants Constants) Validate() error {
for _, constant := range constants {
if err := constant.Validate(); err != nil {
Expand All @@ -79,6 +110,25 @@ func (constants Constants) Validate() error {
}

func (constant *Constant) Validate() error {
if constant.External == nil && constant.Value == nil {
return errors.Wrap(ErrorSpecifyOnlyOneMissing(ValueKey, ExternalKey), Identify(constant))
}

if constant.External != nil && constant.Value != nil {
return errors.Wrap(ErrorSpecifyOnlyOne(ValueKey, ExternalKey), Identify(constant))
}

if constant.External != nil {
bucket, key, err := aws.SplitS3Path(constant.External.Path)
if err != nil {
return errors.Wrap(err, Identify(constant), ExternalKey, PathKey)
}

if ok, err := aws.IsS3FileExternal(bucket, key, constant.External.Region); err != nil || !ok {
return errors.Wrap(ErrorExternalNotFound(constant.External.Path), Identify(constant), ExternalKey, PathKey)
}
}

if constant.Type != nil {
castedValue, err := CastOutputValue(constant.Value, constant.Type)
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions pkg/operator/api/userconfig/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ const (
ErrEnvSchemaMismatch
ErrExtraResourcesWithExternalAPIs
ErrImplDoesNotExist
ErrExternalModelNotFound
ErrExternalNotFound
)

var errorKinds = []string{
Expand Down Expand Up @@ -125,10 +125,10 @@ var errorKinds = []string{
"err_env_schema_mismatch",
"err_extra_resources_with_external_a_p_is",
"err_impl_does_not_exist",
"err_external_model_not_found",
"err_external_not_found",
}

var _ = [1]int{}[int(ErrExternalModelNotFound)-(len(errorKinds)-1)] // Ensure list length matches
var _ = [1]int{}[int(ErrExternalNotFound)-(len(errorKinds)-1)] // Ensure list length matches

func (t ErrorKind) String() string {
return errorKinds[t]
Expand Down Expand Up @@ -578,9 +578,9 @@ func ErrorImplDoesNotExist(path string) error {
}
}

func ErrorExternalModelNotFound(path string) error {
func ErrorExternalNotFound(path string) error {
return Error{
Kind: ErrExternalModelNotFound,
Kind: ErrExternalNotFound,
message: fmt.Sprintf("%s: file not found or inaccessible", path),
}
}
7 changes: 6 additions & 1 deletion pkg/workloads/lib/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,12 @@ def populate_values(self, input, input_schema, preserve_column_refs):
if util.is_resource_ref(input):
res_name = util.get_resource_ref(input)
if res_name in self.constants:
const_val = self.constants[res_name]["value"]
if self.constants[res_name]["value"]:
const_val = self.constants[res_name]["value"]
elif self.constants[res_name]["external"]:
const_val = self.storage.get_json_external(
self.constants[res_name]["external"]["path"]
)
try:
return self.populate_values(const_val, input_schema, preserve_column_refs)
except CortexException as e:
Expand Down
17 changes: 15 additions & 2 deletions pkg/workloads/lib/storage/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,12 @@ def _get_matching_s3_keys_generator(self, prefix="", suffix=""):
def _upload_string_to_s3(self, string, key):
self.s3.put_object(Bucket=self.bucket, Key=key, Body=string)

def _read_bytes_from_s3(self, key, allow_missing=False):
def _read_bytes_from_s3(self, key, allow_missing=False, ext_bucket=None):
try:
byte_array = self.s3.get_object(Bucket=self.bucket, Key=key)["Body"].read()
bucket = self.bucket
if ext_bucket is not None:
bucket = ext_bucket
byte_array = self.s3.get_object(Bucket=bucket, Key=key)["Body"].read()
except self.s3.exceptions.NoSuchKey as e:
if allow_missing:
return None
Expand Down Expand Up @@ -190,3 +193,13 @@ def download_file_external(self, s3_path, local_path):
return local_path
except Exception as e:
raise CortexException("bucket " + bucket, "key " + key) from e

def get_json_external(self, s3_path):
try:
bucket, key = self.deconstruct_s3_path(s3_path)
obj = self._read_bytes_from_s3(key, ext_bucket=bucket)
if obj is None:
return None
return json.loads(obj.decode("utf-8"))
except Exception as e:
raise CortexException("bucket " + bucket, "key " + key) from e
2 changes: 1 addition & 1 deletion pkg/workloads/spark_job/test/unit/spark_util_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ def test_read_parquet_infer_invalid(spark, write_parquet_file, ctx_obj, get_cont
},
},
{
"data": [("1", 0.1, "yolo"), ("1", 1.0, "yolo"), ("1", 1.1, "yolo")],
"data": [("1", 0.1, "a"), ("1", 1.0, "a"), ("1", 1.1, "a")],
"schema": StructType(
[
StructField("a_str", StringType()),
Expand Down