From 782a1308c231a590756239552424c214bdd3fb15 Mon Sep 17 00:00:00 2001 From: Miguel Varela Ramos Date: Fri, 16 Jul 2021 14:20:03 +0200 Subject: [PATCH 1/4] Fix autoscaler bahviour when considering zero-value recommendations and remove dependecy on lastAwakenMap --- pkg/autoscaler/autoscaler.go | 64 ++++++++++++------------------- pkg/autoscaler/recommendations.go | 42 +++++++++++++++----- 2 files changed, 56 insertions(+), 50 deletions(-) diff --git a/pkg/autoscaler/autoscaler.go b/pkg/autoscaler/autoscaler.go index 37f9909e36..391892ad18 100644 --- a/pkg/autoscaler/autoscaler.go +++ b/pkg/autoscaler/autoscaler.go @@ -19,12 +19,12 @@ package autoscaler import ( "fmt" "math" - "sync" "time" "github.com/cortexlabs/cortex/pkg/lib/cron" "github.com/cortexlabs/cortex/pkg/lib/errors" libmath "github.com/cortexlabs/cortex/pkg/lib/math" + "github.com/cortexlabs/cortex/pkg/lib/pointer" "github.com/cortexlabs/cortex/pkg/lib/telemetry" libtime "github.com/cortexlabs/cortex/pkg/lib/time" "github.com/cortexlabs/cortex/pkg/types/spec" @@ -44,19 +44,17 @@ type Scaler interface { } type Autoscaler struct { - sync.Mutex - logger *zap.SugaredLogger - crons map[string]cron.Cron - scalers map[userconfig.Kind]Scaler - lastAwakenTimestamp map[string]time.Time + logger *zap.SugaredLogger + crons map[string]cron.Cron + scalers map[userconfig.Kind]Scaler + recs map[string]*recommendations } func New(logger *zap.SugaredLogger) *Autoscaler { return &Autoscaler{ - logger: logger, - crons: make(map[string]cron.Cron), - scalers: make(map[userconfig.Kind]Scaler), - lastAwakenTimestamp: make(map[string]time.Time), + logger: logger, + crons: make(map[string]cron.Cron), + scalers: make(map[userconfig.Kind]Scaler), } } @@ -65,9 +63,6 @@ func (a *Autoscaler) AddScaler(scaler Scaler, kind userconfig.Kind) { } func (a *Autoscaler) Awaken(api userconfig.Resource) error { - a.Lock() - defer a.Unlock() - scaler, ok := a.scalers[api.Kind] if !ok { return errors.ErrorUnexpected( @@ -94,7 +89,7 @@ func (a *Autoscaler) Awaken(api userconfig.Resource) error { return errors.Wrap(err, "failed to scale api to one") } - a.lastAwakenTimestamp[api.Name] = time.Now() + a.recs[api.Name].add(1) return nil } @@ -104,11 +99,6 @@ func (a *Autoscaler) AddAPI(api userconfig.Resource) error { return nil } - autoscaleFn, err := a.autoscaleFn(api) - if err != nil { - return err - } - errorHandler := func(err error) { log := a.logger.With( zap.String("apiName", api.Name), @@ -119,12 +109,12 @@ func (a *Autoscaler) AddAPI(api userconfig.Resource) error { telemetry.Error(err) } - a.crons[api.Name] = cron.Run(autoscaleFn, errorHandler, spec.AutoscalingTickInterval) + autoscaleFn, err := a.autoscaleFn(api) + if err != nil { + return err + } - // make sure there is no awaken call registered to an older API with the same name - a.Lock() - delete(a.lastAwakenTimestamp, api.Name) - a.Unlock() + a.crons[api.Name] = cron.Run(autoscaleFn, errorHandler, spec.AutoscalingTickInterval) return nil } @@ -140,10 +130,7 @@ func (a *Autoscaler) RemoveAPI(api userconfig.Resource) { delete(a.crons, api.Name) } - a.Lock() - delete(a.lastAwakenTimestamp, api.Name) - a.Unlock() - + delete(a.recs, api.Name) log.Info("autoscaler stop") } @@ -175,7 +162,7 @@ func (a *Autoscaler) autoscaleFn(api userconfig.Resource) (func() error, error) log.Info("autoscaler init") var startTime time.Time - recs := make(recommendations) + a.recs[api.Name] = newRecommendations() return func() error { currentReplicas, err := scaler.CurrentReplicas(api.Name) @@ -227,6 +214,8 @@ func (a *Autoscaler) autoscaleFn(api userconfig.Resource) (func() error, error) recommendation = autoscalingSpec.MaxReplicas } + recs := a.recs[api.Name] + // Rule of thumb: any modifications that don't consider historical recommendations should be performed before // recording the recommendation, any modifications that use historical recommendations should be performed after recs.add(recommendation) @@ -240,25 +229,20 @@ func (a *Autoscaler) autoscaleFn(api userconfig.Resource) (func() error, error) if request < currentReplicas { downscaleStabilizationFloor = recs.maxSince(autoscalingSpec.DownscaleStabilizationPeriod) + if downscaleStabilizationFloor != nil { + downscaleStabilizationFloor = pointer.Int32(libmath.MaxInt32(*downscaleStabilizationFloor, 1)) + } if time.Since(startTime) < autoscalingSpec.DownscaleStabilizationPeriod { request = currentReplicas } else if downscaleStabilizationFloor != nil && request < *downscaleStabilizationFloor { request = *downscaleStabilizationFloor } - - // awaken state: was scaled from zero - // This needs to be protected by a Mutex because an Awaken call will also modify it - a.Lock() - lastAwakenTimestamp := a.lastAwakenTimestamp[api.Name] - - // Make sure we don't scale below zero if API was recently awaken - if time.Since(lastAwakenTimestamp) < autoscalingSpec.DownscaleStabilizationPeriod { - request = libmath.MaxInt32(request, 1) - } - a.Unlock() } if request > currentReplicas { upscaleStabilizationCeil = recs.minSince(autoscalingSpec.UpscaleStabilizationPeriod) + if upscaleStabilizationCeil != nil { + upscaleStabilizationCeil = pointer.Int32(libmath.MaxInt32(*upscaleStabilizationCeil, currentReplicas)) + } if time.Since(startTime) < autoscalingSpec.UpscaleStabilizationPeriod { request = currentReplicas } else if upscaleStabilizationCeil != nil && request > *upscaleStabilizationCeil { diff --git a/pkg/autoscaler/recommendations.go b/pkg/autoscaler/recommendations.go index 300dd93563..4a5385ff7a 100644 --- a/pkg/autoscaler/recommendations.go +++ b/pkg/autoscaler/recommendations.go @@ -18,29 +18,48 @@ package autoscaler import ( "math" + "sync" "time" ) -type recommendations map[time.Time]int32 +type recommendations struct { + mux sync.RWMutex + timeline map[time.Time]int32 +} + +func newRecommendations() *recommendations { + return &recommendations{ + timeline: make(map[time.Time]int32), + } +} -func (r recommendations) add(rec int32) { - r[time.Now()] = rec +func (r *recommendations) add(rec int32) { + r.mux.Lock() + defer r.mux.Unlock() + + r.timeline[time.Now()] = rec } -func (r recommendations) deleteOlderThan(period time.Duration) { - for t := range r { +func (r *recommendations) deleteOlderThan(period time.Duration) { + r.mux.Lock() + defer r.mux.Unlock() + + for t := range r.timeline { if time.Since(t) > period { - delete(r, t) + delete(r.timeline, t) } } } // Returns nil if no recommendations in the period -func (r recommendations) maxSince(period time.Duration) *int32 { +func (r *recommendations) maxSince(period time.Duration) *int32 { + r.mux.RLock() + defer r.mux.RUnlock() + max := int32(math.MinInt32) foundRecommendation := false - for t, rec := range r { + for t, rec := range r.timeline { if time.Since(t) <= period && rec > max { max = rec foundRecommendation = true @@ -55,11 +74,14 @@ func (r recommendations) maxSince(period time.Duration) *int32 { } // Returns nil if no recommendations in the period -func (r recommendations) minSince(period time.Duration) *int32 { +func (r *recommendations) minSince(period time.Duration) *int32 { + r.mux.RLock() + defer r.mux.RUnlock() + min := int32(math.MaxInt32) foundRecommendation := false - for t, rec := range r { + for t, rec := range r.timeline { if time.Since(t) <= period && rec < min { min = rec foundRecommendation = true From b979aa6883c52d4ef2589d522cd7a3f9dcdcf8b5 Mon Sep 17 00:00:00 2001 From: Miguel Varela Ramos Date: Fri, 16 Jul 2021 14:21:11 +0200 Subject: [PATCH 2/4] Add autoscaler tests with different configurations of the autoscaler --- pkg/autoscaler/autoscaler_test.go | 237 +++++++++++++++++++++++++++--- 1 file changed, 219 insertions(+), 18 deletions(-) diff --git a/pkg/autoscaler/autoscaler_test.go b/pkg/autoscaler/autoscaler_test.go index 05ccb2ff42..a1df9c30c7 100644 --- a/pkg/autoscaler/autoscaler_test.go +++ b/pkg/autoscaler/autoscaler_test.go @@ -41,6 +41,207 @@ func newLogger(t *testing.T) *zap.SugaredLogger { return logr } +func generateRecommendationTimeline(t *testing.T, recs []int32, interval time.Duration) *recommendations { + t.Helper() + + startTime := time.Now() + recsTimeline := map[time.Time]int32{} + + for i := range recs { + timestamp := startTime.Add(time.Duration(i) * interval) + recsTimeline[timestamp] = recs[i] + } + + return &recommendations{ + timeline: recsTimeline, + } +} + +func TestAutoscaler_AutoscaleFn(t *testing.T) { + t.Parallel() + log := newLogger(t) + + interval := 250 * time.Millisecond + + cases := []struct { + name string + autoscalingSpec userconfig.Autoscaling + inFlight float64 + currentReplicas int32 + recommendationTimeline []int32 + expectedRequest *int32 + }{ + { + name: "no scale below zero within stabilization period", + autoscalingSpec: userconfig.Autoscaling{ + MinReplicas: 0, + MaxReplicas: 5, + InitReplicas: 0, + TargetInFlight: pointer.Float64(1), + Window: 4 * interval, + DownscaleStabilizationPeriod: time.Second, + UpscaleStabilizationPeriod: time.Second, + MaxDownscaleFactor: 0.75, + MaxUpscaleFactor: 1.5, + DownscaleTolerance: 0.05, + UpscaleTolerance: 0.05, + }, + inFlight: 0, + currentReplicas: 1, + expectedRequest: nil, + }, + { + name: "downscale no stabilization", + autoscalingSpec: userconfig.Autoscaling{ + MinReplicas: 1, + MaxReplicas: 5, + InitReplicas: 1, + TargetInFlight: pointer.Float64(1), + Window: 4 * interval, + DownscaleStabilizationPeriod: 0, + UpscaleStabilizationPeriod: 0, + MaxDownscaleFactor: 0.5, + MaxUpscaleFactor: 1.5, + DownscaleTolerance: 0.05, + UpscaleTolerance: 0.05, + }, + inFlight: 2, + currentReplicas: 5, + recommendationTimeline: []int32{5, 2, 2, 2}, + expectedRequest: pointer.Int32(3), + }, + { + name: "downscale with stabilization", + autoscalingSpec: userconfig.Autoscaling{ + MinReplicas: 1, + MaxReplicas: 5, + InitReplicas: 1, + TargetInFlight: pointer.Float64(1), + Window: 4 * interval, + DownscaleStabilizationPeriod: time.Second, + UpscaleStabilizationPeriod: time.Second, + MaxDownscaleFactor: 0.5, + MaxUpscaleFactor: 1.5, + DownscaleTolerance: 0.05, + UpscaleTolerance: 0.05, + }, + inFlight: 5, + currentReplicas: 1, + recommendationTimeline: []int32{5, 5, 2, 2}, + expectedRequest: nil, + }, + { + name: "upscale no stabilization", + autoscalingSpec: userconfig.Autoscaling{ + MinReplicas: 1, + MaxReplicas: 5, + InitReplicas: 1, + TargetInFlight: pointer.Float64(1), + Window: 4 * interval, + DownscaleStabilizationPeriod: 0, + UpscaleStabilizationPeriod: 0, + MaxDownscaleFactor: 0.5, + MaxUpscaleFactor: 1.5, + DownscaleTolerance: 0.05, + UpscaleTolerance: 0.05, + }, + inFlight: 3, + currentReplicas: 1, + expectedRequest: pointer.Int32(2), + }, + { + name: "upscale with stabilization", + autoscalingSpec: userconfig.Autoscaling{ + MinReplicas: 1, + MaxReplicas: 5, + InitReplicas: 1, + TargetInFlight: pointer.Float64(1), + Window: 4 * interval, + DownscaleStabilizationPeriod: time.Second, + UpscaleStabilizationPeriod: time.Second, + MaxDownscaleFactor: 0.5, + MaxUpscaleFactor: 1.5, + DownscaleTolerance: 0.05, + UpscaleTolerance: 0.05, + }, + inFlight: 5, + currentReplicas: 2, + recommendationTimeline: []int32{2, 2, 2, 5}, + expectedRequest: nil, + }, + { + name: "no upscale below current replicas", + autoscalingSpec: userconfig.Autoscaling{ + MinReplicas: 0, + MaxReplicas: 5, + InitReplicas: 0, + TargetInFlight: pointer.Float64(1), + Window: 4 * interval, + DownscaleStabilizationPeriod: time.Second, + UpscaleStabilizationPeriod: time.Second, + MaxDownscaleFactor: 0.75, + MaxUpscaleFactor: 1.5, + DownscaleTolerance: 0.05, + UpscaleTolerance: 0.05, + }, + inFlight: 3, + currentReplicas: 2, + recommendationTimeline: []int32{0, 1, 2, 3}, + expectedRequest: nil, + }, + } + + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + var latestRequest *int32 + + scalerMock := &ScalerFunc{ + ScaleFunc: func(apiName string, request int32) error { + latestRequest = pointer.Int32(request) + return nil + }, + GetInFlightRequestsFunc: func(apiName string, window time.Duration) (*float64, error) { + return pointer.Float64(tt.inFlight), nil + }, + GetAutoscalingSpecFunc: func(apiName string) (*userconfig.Autoscaling, error) { + return &tt.autoscalingSpec, nil + }, + CurrentReplicasFunc: func(apiName string) (int32, error) { + return tt.currentReplicas, nil + }, + } + + autoScaler := &Autoscaler{ + logger: log, + crons: make(map[string]cron.Cron), + scalers: make(map[userconfig.Kind]Scaler), + recs: make(map[string]*recommendations), + } + autoScaler.AddScaler(scalerMock, userconfig.RealtimeAPIKind) + + apiName := "test" + api := userconfig.Resource{ + Name: apiName, + Kind: userconfig.RealtimeAPIKind, + } + + autoScaler.recs[apiName] = generateRecommendationTimeline(t, tt.recommendationTimeline, interval) + autoscaleFn, err := autoScaler.autoscaleFn(api) + require.NoError(t, err) + + time.Sleep(interval) + + err = autoscaleFn() + require.NoError(t, err) + + require.Equal(t, tt.expectedRequest, latestRequest) + }) + } + +} + func TestAutoscaler_Awake(t *testing.T) { t.Parallel() log := newLogger(t) @@ -78,15 +279,16 @@ func TestAutoscaler_Awake(t *testing.T) { } autoScaler := &Autoscaler{ - logger: log, - crons: make(map[string]cron.Cron), - scalers: make(map[userconfig.Kind]Scaler), - lastAwakenTimestamp: make(map[string]time.Time), + logger: log, + crons: make(map[string]cron.Cron), + scalers: make(map[userconfig.Kind]Scaler), + recs: make(map[string]*recommendations), } autoScaler.AddScaler(scalerMock, userconfig.RealtimeAPIKind) + apiName := "test" api := userconfig.Resource{ - Name: "test", + Name: apiName, Kind: userconfig.RealtimeAPIKind, } @@ -107,9 +309,6 @@ func TestAutoscaler_Awake(t *testing.T) { err = autoScaler.Awaken(api) require.NoError(t, err) - _, ok := autoScaler.lastAwakenTimestamp[api.Name] - require.True(t, ok) - require.Never(t, func() bool { mux.RLock() defer mux.RUnlock() @@ -155,15 +354,16 @@ func TestAutoscaler_MinReplicas(t *testing.T) { } autoScaler := &Autoscaler{ - logger: log, - crons: make(map[string]cron.Cron), - scalers: make(map[userconfig.Kind]Scaler), - lastAwakenTimestamp: make(map[string]time.Time), + logger: log, + crons: make(map[string]cron.Cron), + scalers: make(map[userconfig.Kind]Scaler), + recs: make(map[string]*recommendations), } autoScaler.AddScaler(scalerMock, userconfig.RealtimeAPIKind) + apiName := "test" api := userconfig.Resource{ - Name: "test", + Name: apiName, Kind: userconfig.RealtimeAPIKind, } @@ -226,15 +426,16 @@ func TestAutoscaler_MaxReplicas(t *testing.T) { } autoScaler := &Autoscaler{ - logger: log, - crons: make(map[string]cron.Cron), - scalers: make(map[userconfig.Kind]Scaler), - lastAwakenTimestamp: make(map[string]time.Time), + logger: log, + crons: make(map[string]cron.Cron), + scalers: make(map[userconfig.Kind]Scaler), + recs: make(map[string]*recommendations), } autoScaler.AddScaler(scalerMock, userconfig.RealtimeAPIKind) + apiName := "test" api := userconfig.Resource{ - Name: "test", + Name: apiName, Kind: userconfig.RealtimeAPIKind, } From 876b7583053256258f348a0acac8540797b958c7 Mon Sep 17 00:00:00 2001 From: Miguel Varela Ramos Date: Fri, 16 Jul 2021 14:34:12 +0200 Subject: [PATCH 3/4] Fix linting error --- pkg/autoscaler/autoscaler_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/autoscaler/autoscaler_test.go b/pkg/autoscaler/autoscaler_test.go index a1df9c30c7..b96a12fb1a 100644 --- a/pkg/autoscaler/autoscaler_test.go +++ b/pkg/autoscaler/autoscaler_test.go @@ -191,7 +191,7 @@ func TestAutoscaler_AutoscaleFn(t *testing.T) { }, } - for _, tt := range cases { + for i, tt := range cases { t.Run(tt.name, func(t *testing.T) { t.Parallel() @@ -206,7 +206,7 @@ func TestAutoscaler_AutoscaleFn(t *testing.T) { return pointer.Float64(tt.inFlight), nil }, GetAutoscalingSpecFunc: func(apiName string) (*userconfig.Autoscaling, error) { - return &tt.autoscalingSpec, nil + return &cases[i].autoscalingSpec, nil }, CurrentReplicasFunc: func(apiName string) (int32, error) { return tt.currentReplicas, nil From 349c06c6818601c66c05bff06907cf89acba6a83 Mon Sep 17 00:00:00 2001 From: Miguel Varela Ramos Date: Mon, 19 Jul 2021 11:53:04 +0200 Subject: [PATCH 4/4] Address PR comments --- pkg/autoscaler/autoscaler.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/autoscaler/autoscaler.go b/pkg/autoscaler/autoscaler.go index 391892ad18..ba6a57cab1 100644 --- a/pkg/autoscaler/autoscaler.go +++ b/pkg/autoscaler/autoscaler.go @@ -55,6 +55,7 @@ func New(logger *zap.SugaredLogger) *Autoscaler { logger: logger, crons: make(map[string]cron.Cron), scalers: make(map[userconfig.Kind]Scaler), + recs: make(map[string]*recommendations), } } @@ -230,7 +231,7 @@ func (a *Autoscaler) autoscaleFn(api userconfig.Resource) (func() error, error) if request < currentReplicas { downscaleStabilizationFloor = recs.maxSince(autoscalingSpec.DownscaleStabilizationPeriod) if downscaleStabilizationFloor != nil { - downscaleStabilizationFloor = pointer.Int32(libmath.MaxInt32(*downscaleStabilizationFloor, 1)) + downscaleStabilizationFloor = pointer.Int32(libmath.MinInt32(*downscaleStabilizationFloor, currentReplicas)) } if time.Since(startTime) < autoscalingSpec.DownscaleStabilizationPeriod { request = currentReplicas