diff --git a/docs/applications/resources/environments.md b/docs/applications/resources/environments.md index 45acd8525a..b24f4b6497 100644 --- a/docs/applications/resources/environments.md +++ b/docs/applications/resources/environments.md @@ -26,6 +26,7 @@ Transfer data at scale from data warehouses like S3 into the Cortex environment. data: type: csv # file type (required) path: s3a:/// # S3 is currently supported (required) + region: us-west-2 # S3 region (default: us-west-2) drop_null: # drop any rows that contain at least 1 null value (default: false) csv_config: # optional configuration that can be provided schema: @@ -64,6 +65,7 @@ csv_config: data: type: parquet # file type (required) path: s3a:/// # S3 is currently supported (required) + region: us-west-2 # S3 region (default: us-west-2) drop_null: # drop any rows that contain at least 1 null value (default: false) schema: - parquet_column_name: # name of the column in the parquet file (required) diff --git a/pkg/lib/aws/s3.go b/pkg/lib/aws/s3.go index fd2c576acd..9c62935b2b 100644 --- a/pkg/lib/aws/s3.go +++ b/pkg/lib/aws/s3.go @@ -21,38 +21,46 @@ import ( "path/filepath" "strings" - "github.com/cortexlabs/cortex/pkg/lib/files" - "github.com/cortexlabs/cortex/pkg/lib/msgpack" - "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/endpoints" + "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/cortexlabs/cortex/pkg/lib/errors" + "github.com/cortexlabs/cortex/pkg/lib/files" "github.com/cortexlabs/cortex/pkg/lib/json" + "github.com/cortexlabs/cortex/pkg/lib/msgpack" "github.com/cortexlabs/cortex/pkg/lib/parallel" + "github.com/cortexlabs/cortex/pkg/lib/sets/strset" s "github.com/cortexlabs/cortex/pkg/lib/strings" ) -func (c *Client) S3Path(key string) string { - return "s3://" + filepath.Join(c.Bucket, key) -} +const DefaultS3Region string = endpoints.UsWest2RegionID -func (c *Client) IsS3File(key string) (bool, error) { - return c.IsS3FileExternal(c.Bucket, key) -} +var S3Regions strset.Set -func (c *Client) IsS3Dir(dirPath string) (bool, error) { - prefix := s.EnsureSuffix(dirPath, "/") - return c.IsS3Prefix(prefix) +func init() { + resolver := endpoints.DefaultResolver() + partitions := resolver.(endpoints.EnumPartitions).Partitions() + + S3Regions = strset.New() + + for _, p := range partitions { + if p.ID() == endpoints.AwsPartitionID || p.ID() == endpoints.AwsCnPartitionID { + for id := range p.Regions() { + S3Regions.Add(id) + } + } + } } -func (c *Client) IsS3Prefix(prefix string) (bool, error) { - return c.IsS3PrefixExternal(c.Bucket, prefix) +func (c *Client) S3Path(key string) string { + return "s3://" + filepath.Join(c.Bucket, key) } -func (c *Client) IsS3FileExternal(bucket string, key string) (bool, error) { +func (c *Client) IsS3File(key string) (bool, error) { _, err := c.s3Client.HeadObject(&s3.HeadObjectInput{ - Bucket: aws.String(bucket), + Bucket: aws.String(c.Bucket), Key: aws.String(key), }) @@ -66,9 +74,14 @@ func (c *Client) IsS3FileExternal(bucket string, key string) (bool, error) { return true, nil } -func (c *Client) IsS3PrefixExternal(bucket string, prefix string) (bool, error) { +func (c *Client) IsS3Dir(dirPath string) (bool, error) { + prefix := s.EnsureSuffix(dirPath, "/") + return c.IsS3Prefix(prefix) +} + +func (c *Client) IsS3Prefix(prefix string) (bool, error) { out, err := c.s3Client.ListObjectsV2(&s3.ListObjectsV2Input{ - Bucket: aws.String(bucket), + Bucket: aws.String(c.Bucket), Prefix: aws.String(prefix), }) @@ -80,14 +93,6 @@ func (c *Client) IsS3PrefixExternal(bucket string, prefix string) (bool, error) return hasPrefix, nil } -func (c *Client) IsS3aPrefixExternal(s3aPath string) (bool, error) { - bucket, prefix, err := SplitS3aPath(s3aPath) - if err != nil { - return false, err - } - return c.IsS3PrefixExternal(bucket, prefix) -} - func (c *Client) UploadBytesToS3(data []byte, key string) error { _, err := c.s3Client.PutObject(&s3.PutObjectInput{ Body: bytes.NewReader(data), @@ -253,3 +258,29 @@ func SplitS3aPath(s3aPath string) (string, string, error) { return bucket, key, nil } + +func IsS3PrefixExternal(bucket string, prefix string, region string) (bool, error) { + sess := session.Must(session.NewSession(&aws.Config{ + Region: aws.String(region), + })) + + out, err := s3.New(sess).ListObjectsV2(&s3.ListObjectsV2Input{ + Bucket: aws.String(bucket), + Prefix: aws.String(prefix), + }) + + if err != nil { + return false, errors.Wrap(err, prefix) + } + + hasPrefix := *out.KeyCount > 0 + return hasPrefix, nil +} + +func IsS3aPrefixExternal(s3aPath string, region string) (bool, error) { + bucket, prefix, err := SplitS3aPath(s3aPath) + if err != nil { + return false, err + } + return IsS3PrefixExternal(bucket, prefix, region) +} diff --git a/pkg/operator/api/userconfig/environments.go b/pkg/operator/api/userconfig/environments.go index 58956033e9..358cf2a25b 100644 --- a/pkg/operator/api/userconfig/environments.go +++ b/pkg/operator/api/userconfig/environments.go @@ -17,6 +17,7 @@ limitations under the License. package userconfig import ( + "github.com/cortexlabs/cortex/pkg/lib/aws" "github.com/cortexlabs/cortex/pkg/lib/configreader" cr "github.com/cortexlabs/cortex/pkg/lib/configreader" "github.com/cortexlabs/cortex/pkg/lib/errors" @@ -120,7 +121,7 @@ var logLevelValidation = &cr.StructValidation{ type Data interface { GetIngestedColumns() []string - GetExternalPath() string + GetExternalData() ExternalData Validate() error } @@ -130,11 +131,11 @@ var dataValidation = &cr.InterfaceStructValidation{ ParsedInterfaceStructTypes: map[interface{}]*cr.InterfaceStructType{ CSVEnvironmentDataType: { Type: (*CSVData)(nil), - StructFieldValidations: csvDataFieldValidations, + StructFieldValidations: append(csvDataFieldValidations, externalDataValidation...), }, ParquetEnvironmentDataType: { Type: (*ParquetData)(nil), - StructFieldValidations: parquetDataFieldValidations, + StructFieldValidations: append(parquetDataFieldValidations, externalDataValidation...), }, }, Parser: func(str string) (interface{}, error) { @@ -142,12 +143,33 @@ var dataValidation = &cr.InterfaceStructValidation{ }, } +type ExternalData struct { + Path string `json:"path" yaml:"path"` + Region string `json:"region" yaml:"region"` +} + +var externalDataValidation = []*cr.StructFieldValidation{ + { + StructField: "Path", + StringValidation: cr.GetS3aPathValidation(&cr.S3aPathValidation{ + Required: true, + }), + }, + { + StructField: "Region", + StringValidation: &cr.StringValidation{ + Default: aws.DefaultS3Region, + AllowedValues: aws.S3Regions.Slice(), + }, + }, +} + type CSVData struct { Type EnvironmentDataType `json:"type" yaml:"type"` - Path string `json:"path" yaml:"path"` Schema []string `json:"schema" yaml:"schema"` DropNull bool `json:"drop_null" yaml:"drop_null"` CSVConfig *CSVConfig `json:"csv_config" yaml:"csv_config"` + ExternalData } // CSVConfig is SPARK_VERSION dependent @@ -172,12 +194,6 @@ type CSVConfig struct { } var csvDataFieldValidations = []*cr.StructFieldValidation{ - { - StructField: "Path", - StringValidation: cr.GetS3aPathValidation(&cr.S3aPathValidation{ - Required: true, - }), - }, { StructField: "Schema", StringListValidation: &cr.StringListValidation{ @@ -273,18 +289,12 @@ var csvDataFieldValidations = []*cr.StructFieldValidation{ type ParquetData struct { Type EnvironmentDataType `json:"type" yaml:"type"` - Path string `json:"path" yaml:"path"` Schema []*ParquetColumn `json:"schema" yaml:"schema"` DropNull bool `json:"drop_null" yaml:"drop_null"` + ExternalData } var parquetDataFieldValidations = []*cr.StructFieldValidation{ - { - StructField: "Path", - StringValidation: cr.GetS3aPathValidation(&cr.S3aPathValidation{ - Required: true, - }), - }, { StructField: "Schema", StructListValidation: &cr.StructListValidation{ @@ -381,12 +391,12 @@ func (parqData *ParquetData) Validate() error { return nil } -func (csvData *CSVData) GetExternalPath() string { - return csvData.Path +func (csvData *CSVData) GetExternalData() ExternalData { + return csvData.ExternalData } -func (parqData *ParquetData) GetExternalPath() string { - return parqData.Path +func (parqData *ParquetData) GetExternalData() ExternalData { + return parqData.ExternalData } func (csvData *CSVData) GetIngestedColumns() []string { diff --git a/pkg/operator/workloads/data_job.go b/pkg/operator/workloads/data_job.go index 5c26ab15c3..2980307583 100644 --- a/pkg/operator/workloads/data_job.go +++ b/pkg/operator/workloads/data_job.go @@ -25,6 +25,7 @@ import ( "github.com/cortexlabs/cortex/pkg/consts" "github.com/cortexlabs/cortex/pkg/lib/argo" + "github.com/cortexlabs/cortex/pkg/lib/aws" "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/pointer" "github.com/cortexlabs/cortex/pkg/lib/sets/strset" @@ -183,10 +184,13 @@ func dataWorkloadSpecs(ctx *context.Context) ([]*WorkloadSpec, error) { shouldIngest := !rawFileExists if shouldIngest { - externalDataPath := ctx.Environment.Data.GetExternalPath() - externalDataExists, err := config.AWS.IsS3aPrefixExternal(externalDataPath) - if err != nil || !externalDataExists { - return nil, errors.Wrap(ErrorUserDataUnavailable(externalDataPath), ctx.App.Name, userconfig.Identify(ctx.Environment), userconfig.DataKey, userconfig.PathKey) + externalData := ctx.Environment.Data.GetExternalData() + externalDataExists, err := aws.IsS3aPrefixExternal(externalData.Path, externalData.Region) + if err != nil { + return nil, errors.Wrap(err, externalData.Path, ctx.App.Name, userconfig.Identify(ctx.Environment), userconfig.DataKey, userconfig.PathKey) + } + if !externalDataExists { + return nil, errors.Wrap(ErrorExternalDataUnavailable(externalData.Path), ctx.App.Name, userconfig.Identify(ctx.Environment), userconfig.DataKey, userconfig.PathKey) } for _, rawColumn := range ctx.RawColumns { allComputes = append(allComputes, rawColumn.GetCompute()) diff --git a/pkg/operator/workloads/errors.go b/pkg/operator/workloads/errors.go index bde36ad8f9..aa1b172e98 100644 --- a/pkg/operator/workloads/errors.go +++ b/pkg/operator/workloads/errors.go @@ -24,7 +24,7 @@ type ErrorKind int const ( ErrUnknown ErrorKind = iota - ErrUserDataUnavailable + ErrExternalDataUnavailable ErrMoreThanOneWorkflow ErrContextAppMismatch ErrWorkflowAppMismatch @@ -35,8 +35,8 @@ const ( var errorKinds = []string{ "err_unknown", - "err_user_data_unavailable", - "error_more_than_one_workflow", + "err_external_data_unavailable", + "err_more_than_one_workflow", "err_context_app_mismatch", "err_workflow_app_mismatch", "err_cortex_installation_broken", @@ -89,10 +89,10 @@ func (e Error) Error() string { return e.message } -func ErrorUserDataUnavailable(s3Path string) error { +func ErrorExternalDataUnavailable(s3Path string) error { return Error{ - Kind: ErrUserDataUnavailable, - message: fmt.Sprintf("the file at %s does not exist, or your cluster does not have access to it", s3Path), + Kind: ErrExternalDataUnavailable, + message: fmt.Sprintf("the data at %s does not exist", s3Path), } }