Skip to content

Add region field to input data #155

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/applications/resources/environments.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Transfer data at scale from data warehouses like S3 into the Cortex environment.
data:
type: csv # file type (required)
path: s3a://<bucket_name>/<file_name> # S3 is currently supported (required)
region: us-west-2 # S3 region (default: us-west-2)
drop_null: <bool> # drop any rows that contain at least 1 null value (default: false)
csv_config: <csv_config> # optional configuration that can be provided
schema:
Expand Down Expand Up @@ -64,6 +65,7 @@ csv_config:
data:
type: parquet # file type (required)
path: s3a://<bucket_name>/<file_name> # S3 is currently supported (required)
region: us-west-2 # S3 region (default: us-west-2)
drop_null: <bool> # drop any rows that contain at least 1 null value (default: false)
schema:
- parquet_column_name: <string> # name of the column in the parquet file (required)
Expand Down
83 changes: 57 additions & 26 deletions pkg/lib/aws/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,38 +21,46 @@ import (
"path/filepath"
"strings"

"github.com/cortexlabs/cortex/pkg/lib/files"
"github.com/cortexlabs/cortex/pkg/lib/msgpack"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/endpoints"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"

"github.com/cortexlabs/cortex/pkg/lib/errors"
"github.com/cortexlabs/cortex/pkg/lib/files"
"github.com/cortexlabs/cortex/pkg/lib/json"
"github.com/cortexlabs/cortex/pkg/lib/msgpack"
"github.com/cortexlabs/cortex/pkg/lib/parallel"
"github.com/cortexlabs/cortex/pkg/lib/sets/strset"
s "github.com/cortexlabs/cortex/pkg/lib/strings"
)

func (c *Client) S3Path(key string) string {
return "s3://" + filepath.Join(c.Bucket, key)
}
const DefaultS3Region string = endpoints.UsWest2RegionID

func (c *Client) IsS3File(key string) (bool, error) {
return c.IsS3FileExternal(c.Bucket, key)
}
var S3Regions strset.Set

func (c *Client) IsS3Dir(dirPath string) (bool, error) {
prefix := s.EnsureSuffix(dirPath, "/")
return c.IsS3Prefix(prefix)
func init() {
resolver := endpoints.DefaultResolver()
partitions := resolver.(endpoints.EnumPartitions).Partitions()

S3Regions = strset.New()

for _, p := range partitions {
if p.ID() == endpoints.AwsPartitionID || p.ID() == endpoints.AwsCnPartitionID {
for id := range p.Regions() {
S3Regions.Add(id)
}
}
}
}

func (c *Client) IsS3Prefix(prefix string) (bool, error) {
return c.IsS3PrefixExternal(c.Bucket, prefix)
func (c *Client) S3Path(key string) string {
return "s3://" + filepath.Join(c.Bucket, key)
}

func (c *Client) IsS3FileExternal(bucket string, key string) (bool, error) {
func (c *Client) IsS3File(key string) (bool, error) {
_, err := c.s3Client.HeadObject(&s3.HeadObjectInput{
Bucket: aws.String(bucket),
Bucket: aws.String(c.Bucket),
Key: aws.String(key),
})

Expand All @@ -66,9 +74,14 @@ func (c *Client) IsS3FileExternal(bucket string, key string) (bool, error) {
return true, nil
}

func (c *Client) IsS3PrefixExternal(bucket string, prefix string) (bool, error) {
func (c *Client) IsS3Dir(dirPath string) (bool, error) {
prefix := s.EnsureSuffix(dirPath, "/")
return c.IsS3Prefix(prefix)
}

func (c *Client) IsS3Prefix(prefix string) (bool, error) {
out, err := c.s3Client.ListObjectsV2(&s3.ListObjectsV2Input{
Bucket: aws.String(bucket),
Bucket: aws.String(c.Bucket),
Prefix: aws.String(prefix),
})

Expand All @@ -80,14 +93,6 @@ func (c *Client) IsS3PrefixExternal(bucket string, prefix string) (bool, error)
return hasPrefix, nil
}

func (c *Client) IsS3aPrefixExternal(s3aPath string) (bool, error) {
bucket, prefix, err := SplitS3aPath(s3aPath)
if err != nil {
return false, err
}
return c.IsS3PrefixExternal(bucket, prefix)
}

func (c *Client) UploadBytesToS3(data []byte, key string) error {
_, err := c.s3Client.PutObject(&s3.PutObjectInput{
Body: bytes.NewReader(data),
Expand Down Expand Up @@ -253,3 +258,29 @@ func SplitS3aPath(s3aPath string) (string, string, error) {

return bucket, key, nil
}

func IsS3PrefixExternal(bucket string, prefix string, region string) (bool, error) {
sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String(region),
}))

out, err := s3.New(sess).ListObjectsV2(&s3.ListObjectsV2Input{
Bucket: aws.String(bucket),
Prefix: aws.String(prefix),
})

if err != nil {
return false, errors.Wrap(err, prefix)
}

hasPrefix := *out.KeyCount > 0
return hasPrefix, nil
}

func IsS3aPrefixExternal(s3aPath string, region string) (bool, error) {
bucket, prefix, err := SplitS3aPath(s3aPath)
if err != nil {
return false, err
}
return IsS3PrefixExternal(bucket, prefix, region)
}
52 changes: 31 additions & 21 deletions pkg/operator/api/userconfig/environments.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package userconfig

import (
"github.com/cortexlabs/cortex/pkg/lib/aws"
"github.com/cortexlabs/cortex/pkg/lib/configreader"
cr "github.com/cortexlabs/cortex/pkg/lib/configreader"
"github.com/cortexlabs/cortex/pkg/lib/errors"
Expand Down Expand Up @@ -120,7 +121,7 @@ var logLevelValidation = &cr.StructValidation{

type Data interface {
GetIngestedColumns() []string
GetExternalPath() string
GetExternalData() ExternalData
Validate() error
}

Expand All @@ -130,24 +131,45 @@ var dataValidation = &cr.InterfaceStructValidation{
ParsedInterfaceStructTypes: map[interface{}]*cr.InterfaceStructType{
CSVEnvironmentDataType: {
Type: (*CSVData)(nil),
StructFieldValidations: csvDataFieldValidations,
StructFieldValidations: append(csvDataFieldValidations, externalDataValidation...),
},
ParquetEnvironmentDataType: {
Type: (*ParquetData)(nil),
StructFieldValidations: parquetDataFieldValidations,
StructFieldValidations: append(parquetDataFieldValidations, externalDataValidation...),
},
},
Parser: func(str string) (interface{}, error) {
return EnvironmentDataTypeFromString(str), nil
},
}

type ExternalData struct {
Path string `json:"path" yaml:"path"`
Region string `json:"region" yaml:"region"`
}

var externalDataValidation = []*cr.StructFieldValidation{
{
StructField: "Path",
StringValidation: cr.GetS3aPathValidation(&cr.S3aPathValidation{
Required: true,
}),
},
{
StructField: "Region",
StringValidation: &cr.StringValidation{
Default: aws.DefaultS3Region,
AllowedValues: aws.S3Regions.Slice(),
},
},
}

type CSVData struct {
Type EnvironmentDataType `json:"type" yaml:"type"`
Path string `json:"path" yaml:"path"`
Schema []string `json:"schema" yaml:"schema"`
DropNull bool `json:"drop_null" yaml:"drop_null"`
CSVConfig *CSVConfig `json:"csv_config" yaml:"csv_config"`
ExternalData
}

// CSVConfig is SPARK_VERSION dependent
Expand All @@ -172,12 +194,6 @@ type CSVConfig struct {
}

var csvDataFieldValidations = []*cr.StructFieldValidation{
{
StructField: "Path",
StringValidation: cr.GetS3aPathValidation(&cr.S3aPathValidation{
Required: true,
}),
},
{
StructField: "Schema",
StringListValidation: &cr.StringListValidation{
Expand Down Expand Up @@ -273,18 +289,12 @@ var csvDataFieldValidations = []*cr.StructFieldValidation{

type ParquetData struct {
Type EnvironmentDataType `json:"type" yaml:"type"`
Path string `json:"path" yaml:"path"`
Schema []*ParquetColumn `json:"schema" yaml:"schema"`
DropNull bool `json:"drop_null" yaml:"drop_null"`
ExternalData
}

var parquetDataFieldValidations = []*cr.StructFieldValidation{
{
StructField: "Path",
StringValidation: cr.GetS3aPathValidation(&cr.S3aPathValidation{
Required: true,
}),
},
{
StructField: "Schema",
StructListValidation: &cr.StructListValidation{
Expand Down Expand Up @@ -381,12 +391,12 @@ func (parqData *ParquetData) Validate() error {
return nil
}

func (csvData *CSVData) GetExternalPath() string {
return csvData.Path
func (csvData *CSVData) GetExternalData() ExternalData {
return csvData.ExternalData
}

func (parqData *ParquetData) GetExternalPath() string {
return parqData.Path
func (parqData *ParquetData) GetExternalData() ExternalData {
return parqData.ExternalData
}

func (csvData *CSVData) GetIngestedColumns() []string {
Expand Down
12 changes: 8 additions & 4 deletions pkg/operator/workloads/data_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/cortexlabs/cortex/pkg/consts"
"github.com/cortexlabs/cortex/pkg/lib/argo"
"github.com/cortexlabs/cortex/pkg/lib/aws"
"github.com/cortexlabs/cortex/pkg/lib/errors"
"github.com/cortexlabs/cortex/pkg/lib/pointer"
"github.com/cortexlabs/cortex/pkg/lib/sets/strset"
Expand Down Expand Up @@ -183,10 +184,13 @@ func dataWorkloadSpecs(ctx *context.Context) ([]*WorkloadSpec, error) {

shouldIngest := !rawFileExists
if shouldIngest {
externalDataPath := ctx.Environment.Data.GetExternalPath()
externalDataExists, err := config.AWS.IsS3aPrefixExternal(externalDataPath)
if err != nil || !externalDataExists {
return nil, errors.Wrap(ErrorUserDataUnavailable(externalDataPath), ctx.App.Name, userconfig.Identify(ctx.Environment), userconfig.DataKey, userconfig.PathKey)
externalData := ctx.Environment.Data.GetExternalData()
externalDataExists, err := aws.IsS3aPrefixExternal(externalData.Path, externalData.Region)
if err != nil {
return nil, errors.Wrap(err, externalData.Path, ctx.App.Name, userconfig.Identify(ctx.Environment), userconfig.DataKey, userconfig.PathKey)
}
if !externalDataExists {
return nil, errors.Wrap(ErrorExternalDataUnavailable(externalData.Path), ctx.App.Name, userconfig.Identify(ctx.Environment), userconfig.DataKey, userconfig.PathKey)
}
for _, rawColumn := range ctx.RawColumns {
allComputes = append(allComputes, rawColumn.GetCompute())
Expand Down
12 changes: 6 additions & 6 deletions pkg/operator/workloads/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type ErrorKind int

const (
ErrUnknown ErrorKind = iota
ErrUserDataUnavailable
ErrExternalDataUnavailable
ErrMoreThanOneWorkflow
ErrContextAppMismatch
ErrWorkflowAppMismatch
Expand All @@ -35,8 +35,8 @@ const (

var errorKinds = []string{
"err_unknown",
"err_user_data_unavailable",
"error_more_than_one_workflow",
"err_external_data_unavailable",
"err_more_than_one_workflow",
"err_context_app_mismatch",
"err_workflow_app_mismatch",
"err_cortex_installation_broken",
Expand Down Expand Up @@ -89,10 +89,10 @@ func (e Error) Error() string {
return e.message
}

func ErrorUserDataUnavailable(s3Path string) error {
func ErrorExternalDataUnavailable(s3Path string) error {
return Error{
Kind: ErrUserDataUnavailable,
message: fmt.Sprintf("the file at %s does not exist, or your cluster does not have access to it", s3Path),
Kind: ErrExternalDataUnavailable,
message: fmt.Sprintf("the data at %s does not exist", s3Path),
}
}

Expand Down