Skip to content

Commit ac4de04

Browse files
authored
Implement TTL for AsyncAPI workloads (#2151)
1 parent 9f64ef7 commit ac4de04

File tree

35 files changed

+444
-207
lines changed

35 files changed

+444
-207
lines changed

async-gateway/main.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,11 @@ func main() {
7878
}()
7979

8080
var (
81-
port = flag.String("port", _defaultPort, "port on which the gateway server runs on")
82-
queueURL = flag.String("queue", "", "SQS queue URL")
83-
region = flag.String("region", "", "AWS region")
84-
bucket = flag.String("bucket", "", "AWS bucket")
85-
clusterName = flag.String("cluster", "", "cluster name")
81+
port = flag.String("port", _defaultPort, "port on which the gateway server runs on")
82+
queueURL = flag.String("queue", "", "SQS queue URL")
83+
region = flag.String("region", "", "AWS region")
84+
bucket = flag.String("bucket", "", "AWS bucket")
85+
clusterUID = flag.String("cluster-uid", "", "cluster UID")
8686
)
8787
flag.Parse()
8888

@@ -93,8 +93,8 @@ func main() {
9393
log.Fatal("missing required option: -region")
9494
case *bucket == "":
9595
log.Fatal("missing required option: -bucket")
96-
case *clusterName == "":
97-
log.Fatal("missing required option: -cluster")
96+
case *clusterUID == "":
97+
log.Fatal("missing required option: -cluster-uid")
9898
}
9999

100100
apiName := flag.Arg(0)
@@ -115,7 +115,7 @@ func main() {
115115
s3Storage := NewS3(sess, *bucket)
116116
sqsQueue := NewSQS(*queueURL, sess)
117117

118-
svc := NewService(*clusterName, apiName, sqsQueue, s3Storage, log)
118+
svc := NewService(*clusterUID, apiName, sqsQueue, s3Storage, log)
119119
ep := NewEndpoint(svc, log)
120120

121121
router := mux.NewRouter()

async-gateway/service.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,21 +32,21 @@ type Service interface {
3232
}
3333

3434
type service struct {
35-
logger *zap.Logger
36-
queue Queue
37-
storage Storage
38-
clusterName string
39-
apiName string
35+
logger *zap.Logger
36+
queue Queue
37+
storage Storage
38+
clusterUID string
39+
apiName string
4040
}
4141

4242
// NewService creates a new async-gateway service
43-
func NewService(clusterName, apiName string, queue Queue, storage Storage, logger *zap.Logger) Service {
43+
func NewService(clusterUID, apiName string, queue Queue, storage Storage, logger *zap.Logger) Service {
4444
return &service{
45-
logger: logger,
46-
queue: queue,
47-
storage: storage,
48-
clusterName: clusterName,
49-
apiName: apiName,
45+
logger: logger,
46+
queue: queue,
47+
storage: storage,
48+
clusterUID: clusterUID,
49+
apiName: apiName,
5050
}
5151
}
5252

@@ -151,5 +151,5 @@ func (s *service) getStatus(id string) (Status, error) {
151151
}
152152

153153
func (s *service) workloadStoragePrefix() string {
154-
return fmt.Sprintf("%s/apis/%s/workloads", s.clusterName, s.apiName)
154+
return fmt.Sprintf("%s/workloads/%s", s.clusterUID, s.apiName)
155155
}

cli/cmd/cluster.go

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@ import (
2828
"github.com/aws/aws-sdk-go/service/autoscaling"
2929
"github.com/aws/aws-sdk-go/service/ec2"
3030
"github.com/aws/aws-sdk-go/service/elbv2"
31+
"github.com/aws/aws-sdk-go/service/s3"
3132
"github.com/cortexlabs/cortex/cli/cluster"
3233
"github.com/cortexlabs/cortex/cli/types/cliconfig"
3334
"github.com/cortexlabs/cortex/cli/types/flags"
35+
"github.com/cortexlabs/cortex/pkg/consts"
3436
"github.com/cortexlabs/cortex/pkg/lib/archive"
3537
"github.com/cortexlabs/cortex/pkg/lib/aws"
3638
"github.com/cortexlabs/cortex/pkg/lib/console"
@@ -45,6 +47,7 @@ import (
4547
s "github.com/cortexlabs/cortex/pkg/lib/strings"
4648
"github.com/cortexlabs/cortex/pkg/lib/table"
4749
"github.com/cortexlabs/cortex/pkg/lib/telemetry"
50+
libtime "github.com/cortexlabs/cortex/pkg/lib/time"
4851
"github.com/cortexlabs/cortex/pkg/operator/schema"
4952
"github.com/cortexlabs/cortex/pkg/types/clusterconfig"
5053
"github.com/cortexlabs/cortex/pkg/types/clusterstate"
@@ -187,6 +190,11 @@ var _clusterUpCmd = &cobra.Command{
187190
exit.Error(err)
188191
}
189192

193+
err = setLifecycleRulesOnClusterUp(awsClient, clusterConfig.Bucket, clusterConfig.ClusterUID)
194+
if err != nil {
195+
exit.Error(err)
196+
}
197+
190198
err = createLogGroupIfNotFound(awsClient, clusterConfig.ClusterName, clusterConfig.Tags)
191199
if err != nil {
192200
exit.Error(err)
@@ -426,6 +434,7 @@ var _clusterDownCmd = &cobra.Command{
426434
if err != nil {
427435
exit.Error(err)
428436
}
437+
bucketName := clusterconfig.BucketName(accountID, accessConfig.ClusterName, accessConfig.Region)
429438

430439
warnIfNotAdmin(awsClient)
431440

@@ -480,11 +489,24 @@ var _clusterDownCmd = &cobra.Command{
480489
fmt.Println()
481490
} else if exitCode == nil || *exitCode != 0 {
482491
out = filterEKSCTLOutput(out)
483-
helpStr := fmt.Sprintf("\nNote: if this error cannot be resolved, please ensure that all CloudFormation stacks for this cluster eventually become fully deleted (%s). If the stack deletion process has failed, please delete the stacks directly from the AWS console (this may require manually deleting particular AWS resources that are blocking the stack deletion)", clusterstate.CloudFormationURL(accessConfig.ClusterName, accessConfig.Region))
492+
template := "\nNote: if this error cannot be resolved, please ensure that all CloudFormation stacks for this cluster eventually become fully deleted (%s)."
493+
template += " If the stack deletion process has failed, please delete the stacks directly from the AWS console (this may require manually deleting particular AWS resources that are blocking the stack deletion)."
494+
template += " In addition to deleting the stacks manually from the AWS console, also make sure to empty and remove the %s bucket"
495+
helpStr := fmt.Sprintf(template, clusterstate.CloudFormationURL(accessConfig.ClusterName, accessConfig.Region), bucketName)
484496
fmt.Println(helpStr)
485497
exit.Error(ErrorClusterDown(out + helpStr))
486498
}
487499

500+
// set lifecycle policy to clean the bucket
501+
fmt.Printf("○ setting lifecycle policy to empty the %s bucket ", bucketName)
502+
err = setLifecycleRulesOnClusterDown(awsClient, bucketName)
503+
if err != nil {
504+
fmt.Printf("\n\nfailed to set lifecycle policy to empty the %s bucket; you can remove the bucket manually via the s3 console: https://s3.console.aws.amazon.com/s3/management/%s\n", bucketName, bucketName)
505+
errors.PrintError(err)
506+
fmt.Println()
507+
}
508+
fmt.Println("✓")
509+
488510
// delete policy after spinning down the cluster (which deletes the roles) because policies can't be deleted if they are attached to roles
489511
policyARN := clusterconfig.DefaultPolicyARN(accountID, accessConfig.ClusterName, accessConfig.Region)
490512
fmt.Printf("○ deleting auto-generated iam policy %s ", policyARN)
@@ -549,6 +571,7 @@ var _clusterDownCmd = &cobra.Command{
549571
}
550572

551573
fmt.Printf("\nplease check CloudFormation to ensure that all resources for the %s cluster eventually become successfully deleted: %s\n", accessConfig.ClusterName, clusterstate.CloudFormationURL(accessConfig.ClusterName, accessConfig.Region))
574+
fmt.Printf("\na lifecycle rule has been applied to the cluster’s %s bucket to empty its contents later today; you can delete the %s bucket via the s3 console once it has been emptied: https://s3.console.aws.amazon.com/s3/management/%s\n", bucketName, bucketName, bucketName)
552575

553576
cachedClusterConfigPath := cachedClusterConfigPath(accessConfig.ClusterName, accessConfig.Region)
554577
os.Remove(cachedClusterConfigPath)
@@ -1119,6 +1142,71 @@ func createS3BucketIfNotFound(awsClient *aws.Client, bucket string, tags map[str
11191142
return err
11201143
}
11211144

1145+
func setLifecycleRulesOnClusterUp(awsClient *aws.Client, bucket, newClusterUID string) error {
1146+
err := awsClient.DeleteLifecycleRules(bucket)
1147+
if err != nil {
1148+
return err
1149+
}
1150+
1151+
clusterUIDs, err := awsClient.ListS3TopLevelDirs(bucket)
1152+
if err != nil {
1153+
return err
1154+
}
1155+
1156+
if len(clusterUIDs)+1 > consts.MaxBucketLifecycleRules {
1157+
return ErrorClusterUIDsLimitInBucket(bucket)
1158+
}
1159+
1160+
expirationDate := libtime.GetCurrentUTCDate().Add(-24 * time.Hour)
1161+
rules := []s3.LifecycleRule{}
1162+
for _, clusterUID := range clusterUIDs {
1163+
rules = append(rules, s3.LifecycleRule{
1164+
Expiration: &s3.LifecycleExpiration{
1165+
Date: &expirationDate,
1166+
},
1167+
ID: pointer.String("cluster-remove-" + clusterUID),
1168+
Filter: &s3.LifecycleRuleFilter{
1169+
Prefix: pointer.String(s.EnsureSuffix(clusterUID, "/")),
1170+
},
1171+
Status: pointer.String("Enabled"),
1172+
})
1173+
}
1174+
1175+
rules = append(rules, s3.LifecycleRule{
1176+
Expiration: &s3.LifecycleExpiration{
1177+
Days: pointer.Int64(consts.AsyncWorkloadsExpirationDays),
1178+
},
1179+
ID: pointer.String("async-workloads-expiry-policy"),
1180+
Filter: &s3.LifecycleRuleFilter{
1181+
Prefix: pointer.String(s.EnsureSuffix(filepath.Join(newClusterUID, "workloads"), "/")),
1182+
},
1183+
Status: pointer.String("Enabled"),
1184+
})
1185+
1186+
return awsClient.SetLifecycleRules(bucket, rules)
1187+
}
1188+
1189+
func setLifecycleRulesOnClusterDown(awsClient *aws.Client, bucket string) error {
1190+
err := awsClient.DeleteLifecycleRules(bucket)
1191+
if err != nil {
1192+
return err
1193+
}
1194+
1195+
expirationDate := libtime.GetCurrentUTCDate().Add(-24 * time.Hour)
1196+
return awsClient.SetLifecycleRules(bucket, []s3.LifecycleRule{
1197+
{
1198+
Expiration: &s3.LifecycleExpiration{
1199+
Date: &expirationDate,
1200+
},
1201+
ID: pointer.String("bucket-cleaner"),
1202+
Filter: &s3.LifecycleRuleFilter{
1203+
Prefix: pointer.String(""),
1204+
},
1205+
Status: pointer.String("Enabled"),
1206+
},
1207+
})
1208+
}
1209+
11221210
func createLogGroupIfNotFound(awsClient *aws.Client, logGroup string, tags map[string]string) error {
11231211
logGroupFound, err := awsClient.DoesLogGroupExist(logGroup)
11241212
if err != nil {

cli/cmd/errors.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ const (
7171
ErrAPINameMustBeProvided = "cli.api_name_must_be_provided"
7272
ErrAPINotFoundInConfig = "cli.api_not_found_in_config"
7373
ErrNotSupportedForKindAndType = "cli.not_supported_for_kind_and_type"
74+
ErrClusterUIDsLimitInBucket = "cli.cluster_uids_limit_in_bucket"
7475
)
7576

7677
func ErrorInvalidProvider(providerStr, cliConfigPath string) error {
@@ -296,3 +297,10 @@ func ErrorNotSupportedForKindAndType(kind userconfig.Kind, handlerType userconfi
296297
},
297298
})
298299
}
300+
301+
func ErrorClusterUIDsLimitInBucket(bucket string) error {
302+
return errors.WithStack(&errors.Error{
303+
Kind: ErrClusterUIDsLimitInBucket,
304+
Message: fmt.Sprintf("detected too many top level folders in %s bucket; please empty your bucket and try again", bucket),
305+
})
306+
}

enqueuer/enqueuer.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,12 @@ const (
4040
)
4141

4242
type EnvConfig struct {
43-
ClusterName string
44-
Region string
45-
Version string
46-
Bucket string
47-
APIName string
48-
JobID string
43+
ClusterUID string
44+
Region string
45+
Version string
46+
Bucket string
47+
APIName string
48+
JobID string
4949
}
5050

5151
// FIXME: all these types should be shared with the cortex webserver (from where the payload is submitted)
@@ -159,13 +159,13 @@ func (e *Enqueuer) Enqueue() (int, error) {
159159
}
160160

161161
func (e *Enqueuer) UploadBatchCount(batchCount int) error {
162-
key := spec.JobBatchCountKey(e.envConfig.ClusterName, userconfig.BatchAPIKind, e.envConfig.APIName, e.envConfig.JobID)
162+
key := spec.JobBatchCountKey(e.envConfig.ClusterUID, userconfig.BatchAPIKind, e.envConfig.APIName, e.envConfig.JobID)
163163
return e.aws.UploadStringToS3(s.Int(batchCount), e.envConfig.Bucket, key)
164164
}
165165

166166
func (e *Enqueuer) getJobPayload() (JobSubmission, error) {
167-
// e.g. <cluster name>/jobs/<job_api_kind>/<cortex version>/<api_name>/<job_id>
168-
key := spec.JobPayloadKey(e.envConfig.ClusterName, userconfig.BatchAPIKind, e.envConfig.APIName, e.envConfig.JobID)
167+
// e.g. <cluster uid>/jobs/<job_api_kind>/<cortex version>/<api_name>/<job_id>
168+
key := spec.JobPayloadKey(e.envConfig.ClusterUID, userconfig.BatchAPIKind, e.envConfig.APIName, e.envConfig.JobID)
169169

170170
submissionBytes, err := e.aws.ReadBytesFromS3(e.envConfig.Bucket, key)
171171
if err != nil {
@@ -181,7 +181,7 @@ func (e *Enqueuer) getJobPayload() (JobSubmission, error) {
181181
}
182182

183183
func (e *Enqueuer) deleteJobPayload() error {
184-
key := spec.JobPayloadKey(e.envConfig.ClusterName, userconfig.BatchAPIKind, e.envConfig.APIName, e.envConfig.JobID)
184+
key := spec.JobPayloadKey(e.envConfig.ClusterUID, userconfig.BatchAPIKind, e.envConfig.APIName, e.envConfig.JobID)
185185
if err := e.aws.DeleteS3File(e.envConfig.Bucket, key); err != nil {
186186
return err
187187
}

enqueuer/helpers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func s3IteratorFromLister(awsClient *awslib.Client, s3Lister S3Lister, fn func(s
9595
return err
9696
}
9797

98-
err = awsClientForBucket.S3Iterator(bucket, key, false, nil, func(s3Obj *s3.Object) (bool, error) {
98+
err = awsClientForBucket.S3Iterator(bucket, key, false, nil, nil, func(s3Obj *s3.Object) (bool, error) {
9999
s3FilePath := awslib.S3Path(bucket, *s3Obj.Key)
100100

101101
shouldSkip := false

enqueuer/main.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,14 @@ func createLogger() (*zap.Logger, error) {
6161

6262
func main() {
6363
var (
64-
clusterName string
65-
region string
66-
bucket string
67-
queueURL string
68-
apiName string
69-
jobID string
64+
clusterUID string
65+
region string
66+
bucket string
67+
queueURL string
68+
apiName string
69+
jobID string
7070
)
71-
flag.StringVar(&clusterName, "cluster", os.Getenv("CORTEX_CLUSTER_NAME"), "cluster name (can be set throught the CORTEX_CLUSTER_NAME env variable)")
71+
flag.StringVar(&clusterUID, "cluster-uid", os.Getenv("CORTEX_CLUSTER_UID"), "cluster UID (can be set throught the CORTEX_CLUSTER_UID env variable)")
7272
flag.StringVar(&region, "region", os.Getenv("CORTEX_REGION"), "cluster region (can be set throught the CORTEX_REGION env variable)")
7373
flag.StringVar(&bucket, "bucket", os.Getenv("CORTEX_BUCKET"), "cortex S3 bucket (can be set throught the CORTEX_BUCKET env variable)")
7474
flag.StringVar(&queueURL, "queue", "", "target queue URL to where the api messages will be enqueued")
@@ -91,8 +91,8 @@ func main() {
9191
}()
9292

9393
switch {
94-
case clusterName == "":
95-
log.Fatal("-cluster is a required option")
94+
case clusterUID == "":
95+
log.Fatal("-cluster-uid is a required option")
9696
case region == "":
9797
log.Fatal("-region is a required option")
9898
case bucket == "":
@@ -106,12 +106,12 @@ func main() {
106106
}
107107

108108
envConfig := EnvConfig{
109-
ClusterName: clusterName,
110-
Region: region,
111-
Version: version,
112-
Bucket: bucket,
113-
APIName: apiName,
114-
JobID: jobID,
109+
ClusterUID: clusterUID,
110+
Region: region,
111+
Version: version,
112+
Bucket: bucket,
113+
APIName: apiName,
114+
JobID: jobID,
115115
}
116116

117117
enqueuer, err := NewEnqueuer(envConfig, queueURL, log)

manager/render_template.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,16 @@
1818
import pathlib
1919
from jinja2 import Environment, FileSystemLoader
2020

21+
# python render_template.py [CLUSTER_CONFIG_PATH] TEMPLATE_PATH
2122
if __name__ == "__main__":
22-
cluster_config_path = sys.argv[1]
23-
template_path = pathlib.Path(sys.argv[2])
23+
if len(sys.argv) == 3:
24+
yaml_file_path = sys.argv[1]
25+
template_path = pathlib.Path(sys.argv[2])
26+
elif len(sys.argv) == 2:
27+
yaml_file_path = None
28+
template_path = pathlib.Path(sys.argv[1])
29+
else:
30+
raise RuntimeError(f"incorrect number of parameters ({len(sys.argv)})")
2431

2532
file_loader = FileSystemLoader(str(template_path.parent))
2633
env = Environment(loader=file_loader)
@@ -29,6 +36,10 @@
2936
env.rstrip_blocks = True
3037

3138
template = env.get_template(str(template_path.name))
32-
with open(cluster_config_path, "r") as f:
33-
cluster_config = yaml.safe_load(f)
34-
print(template.render(config=cluster_config, env=os.environ))
39+
40+
if yaml_file_path:
41+
with open(yaml_file_path, "r") as f:
42+
yaml_data = yaml.safe_load(f)
43+
print(template.render(config=yaml_data, env=os.environ))
44+
else:
45+
print(template.render(env=os.environ))

pkg/consts/consts.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ var (
4949
DefaultMaxReplicaConcurrency = int64(1024)
5050
NeuronCoresPerInf = int64(4)
5151
AuthHeader = "X-Cortex-Authorization"
52+
53+
MaxBucketLifecycleRules = 100
54+
AsyncWorkloadsExpirationDays = int64(7)
5255
)
5356

5457
func DefaultRegistry() string {

0 commit comments

Comments
 (0)