diff --git a/pkg/operator/workloads/api_workload.go b/pkg/operator/workloads/api_workload.go index 9f4fe4f235..788e4e9829 100644 --- a/pkg/operator/workloads/api_workload.go +++ b/pkg/operator/workloads/api_workload.go @@ -20,7 +20,6 @@ import ( "path" kapps "k8s.io/api/apps/v1" - kautoscaling "k8s.io/api/autoscaling/v2beta2" kcore "k8s.io/api/core/v1" kextensions "k8s.io/api/extensions/v1beta1" kresource "k8s.io/apimachinery/pkg/api/resource" @@ -112,7 +111,8 @@ func (aw *APIWorkload) Start(ctx *context.Context) error { return err } - _, err = config.Kubernetes.ApplyHPA(hpaSpec(ctx, api)) + // Delete HPA while updating replicas to avoid unwanted autoscaling + _, err = config.Kubernetes.DeleteHPA(k8sDeloymentName) if err != nil { return err } @@ -132,12 +132,7 @@ func (aw *APIWorkload) IsSucceeded(ctx *context.Context) (bool, error) { return false, nil } - hpa, err := config.Kubernetes.GetHPA(k8sDeloymentName) - if err != nil { - return false, err - } - - if doesAPIComputeNeedsUpdating(api, k8sDeployment, hpa) { + if doesAPIComputeNeedsUpdating(api, k8sDeployment) { return false, nil } @@ -145,7 +140,7 @@ func (aw *APIWorkload) IsSucceeded(ctx *context.Context) (bool, error) { if err != nil { return false, err } - requestedReplicas := getRequestedReplicasFromDeployment(api, k8sDeployment, hpa) + requestedReplicas := getRequestedReplicasFromDeployment(api, k8sDeployment, nil) if updatedReplicas < requestedReplicas { return false, nil } @@ -165,12 +160,7 @@ func (aw *APIWorkload) IsRunning(ctx *context.Context) (bool, error) { return false, nil } - hpa, err := config.Kubernetes.GetHPA(k8sDeloymentName) - if err != nil { - return false, err - } - - if doesAPIComputeNeedsUpdating(api, k8sDeployment, hpa) { + if doesAPIComputeNeedsUpdating(api, k8sDeployment) { return false, nil } @@ -178,7 +168,7 @@ func (aw *APIWorkload) IsRunning(ctx *context.Context) (bool, error) { if err != nil { return false, err } - requestedReplicas := getRequestedReplicasFromDeployment(api, k8sDeployment, hpa) + requestedReplicas := getRequestedReplicasFromDeployment(api, k8sDeployment, nil) if updatedReplicas < requestedReplicas { return true, nil } @@ -198,12 +188,7 @@ func (aw *APIWorkload) IsStarted(ctx *context.Context) (bool, error) { return false, nil } - hpa, err := config.Kubernetes.GetHPA(k8sDeloymentName) - if err != nil { - return false, err - } - - if doesAPIComputeNeedsUpdating(api, k8sDeployment, hpa) { + if doesAPIComputeNeedsUpdating(api, k8sDeployment) { return false, nil } @@ -488,26 +473,7 @@ func serviceSpec(ctx *context.Context, api *context.API) *kcore.Service { }) } -func hpaSpec(ctx *context.Context, api *context.API) *kautoscaling.HorizontalPodAutoscaler { - return k8s.HPA(&k8s.HPASpec{ - DeploymentName: internalAPIName(api.Name, ctx.App.Name), - MinReplicas: api.Compute.MinReplicas, - MaxReplicas: api.Compute.MaxReplicas, - TargetCPUUtilization: api.Compute.TargetCPUUtilization, - Labels: map[string]string{ - "appName": ctx.App.Name, - "workloadType": workloadTypeAPI, - "apiName": api.Name, - }, - Namespace: config.Cortex.Namespace, - }) -} - -func doesAPIComputeNeedsUpdating(api *context.API, deployment *kapps.Deployment, hpa *kautoscaling.HorizontalPodAutoscaler) bool { - if !k8s.IsHPAUpToDate(hpa, api.Compute.MinReplicas, api.Compute.MaxReplicas, api.Compute.TargetCPUUtilization) { - return true - } - +func doesAPIComputeNeedsUpdating(api *context.API, deployment *kapps.Deployment) bool { curCPU, curMem, curGPU := APIPodCompute(deployment.Spec.Template.Spec.Containers) if !userconfig.QuantityPtrsEqual(curCPU, &api.Compute.CPU) { return true @@ -587,29 +553,6 @@ func addToDeploymentMap(deployments map[string]*kapps.Deployment, deployment kap deployments[apiName] = &deployment } -// This returns map apiName -> hpa (not internalName -> hpa) -func apiHPAMap(appName string) (map[string]*kautoscaling.HorizontalPodAutoscaler, error) { - hpaList, err := config.Kubernetes.ListHPAsByLabels(map[string]string{ - "appName": appName, - "workloadType": workloadTypeAPI, - }) - if err != nil { - return nil, errors.Wrap(err, appName) - } - - hpas := make(map[string]*kautoscaling.HorizontalPodAutoscaler, len(hpaList)) - for _, hpa := range hpaList { - addToHPAMap(hpas, hpa) - } - return hpas, nil -} - -// Avoid pointer in loop issues -func addToHPAMap(hpas map[string]*kautoscaling.HorizontalPodAutoscaler, hpa kautoscaling.HorizontalPodAutoscaler) { - apiName := hpa.Labels["apiName"] - hpas[apiName] = &hpa -} - func internalAPIName(apiName string, appName string) string { return appName + "----" + apiName } diff --git a/pkg/operator/workloads/hpa_workload.go b/pkg/operator/workloads/hpa_workload.go new file mode 100644 index 0000000000..0ab220562e --- /dev/null +++ b/pkg/operator/workloads/hpa_workload.go @@ -0,0 +1,134 @@ +/* +Copyright 2019 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workloads + +import ( + "time" + + kapps "k8s.io/api/apps/v1" + kautoscaling "k8s.io/api/autoscaling/v2beta2" + kcore "k8s.io/api/core/v1" + + "github.com/cortexlabs/cortex/pkg/lib/k8s" + "github.com/cortexlabs/cortex/pkg/operator/api/context" + "github.com/cortexlabs/cortex/pkg/operator/config" +) + +type HPAWorkload struct { + BaseWorkload + APIID string +} + +func extractHPAWorkloads(ctx *context.Context) []Workload { + workloads := make([]Workload, 0, len(ctx.APIs)) + + for _, api := range ctx.APIs { + workloads = append(workloads, &HPAWorkload{ + BaseWorkload: emptyBaseWorkload(ctx.App.Name, api.WorkloadID, workloadTypeHPA), // HPA doesn't produce any resources + APIID: api.ID, + }) + } + + return workloads +} + +func (hw *HPAWorkload) Start(ctx *context.Context) error { + api := ctx.APIs.OneByID(hw.APIID) + + _, err := config.Kubernetes.ApplyHPA(hpaSpec(ctx, api)) + if err != nil { + return err + } + + return nil +} + +func (hw *HPAWorkload) IsSucceeded(ctx *context.Context) (bool, error) { + api := ctx.APIs.OneByID(hw.APIID) + k8sDeloymentName := internalAPIName(api.Name, ctx.App.Name) + + hpa, err := config.Kubernetes.GetHPA(k8sDeloymentName) + if err != nil { + return false, err + } + + return k8s.IsHPAUpToDate(hpa, api.Compute.MinReplicas, api.Compute.MaxReplicas, api.Compute.TargetCPUUtilization), nil +} + +func (hw *HPAWorkload) IsRunning(ctx *context.Context) (bool, error) { + return false, nil +} + +func (hw *HPAWorkload) IsStarted(ctx *context.Context) (bool, error) { + return hw.IsSucceeded(ctx) +} + +func (hw *HPAWorkload) CanRun(ctx *context.Context) (bool, error) { + api := ctx.APIs.OneByID(hw.APIID) + k8sDeloymentName := internalAPIName(api.Name, ctx.App.Name) + + k8sDeployment, err := config.Kubernetes.GetDeployment(k8sDeloymentName) + if err != nil { + return false, err + } + if k8sDeployment == nil || k8sDeployment.Labels["resourceID"] != api.ID || k8sDeployment.DeletionTimestamp != nil { + return false, nil + } + + if doesAPIComputeNeedsUpdating(api, k8sDeployment) { + return false, nil + } + + updatedReplicas, err := numUpdatedReadyReplicas(ctx, api) + if err != nil { + return false, err + } + requestedReplicas := getRequestedReplicasFromDeployment(api, k8sDeployment, nil) + if updatedReplicas < requestedReplicas { + return false, nil + } + + for _, condition := range k8sDeployment.Status.Conditions { + if condition.Type == kapps.DeploymentProgressing && + condition.Status == kcore.ConditionTrue && + !condition.LastUpdateTime.IsZero() && + time.Now().After(condition.LastUpdateTime.Add(35*time.Second)) { // the metrics poll interval is 30 seconds, so 35 should be safe + return true, nil + } + } + + return false, nil +} + +func (hw *HPAWorkload) IsFailed(ctx *context.Context) (bool, error) { + return false, nil +} + +func hpaSpec(ctx *context.Context, api *context.API) *kautoscaling.HorizontalPodAutoscaler { + return k8s.HPA(&k8s.HPASpec{ + DeploymentName: internalAPIName(api.Name, ctx.App.Name), + MinReplicas: api.Compute.MinReplicas, + MaxReplicas: api.Compute.MaxReplicas, + TargetCPUUtilization: api.Compute.TargetCPUUtilization, + Labels: map[string]string{ + "appName": ctx.App.Name, + "workloadType": workloadTypeAPI, + "apiName": api.Name, + }, + Namespace: config.Cortex.Namespace, + }) +} diff --git a/pkg/operator/workloads/workflow.go b/pkg/operator/workloads/workflow.go index 80d722ef4c..04eaf2ef37 100644 --- a/pkg/operator/workloads/workflow.go +++ b/pkg/operator/workloads/workflow.go @@ -63,6 +63,7 @@ func extractWorkloads(ctx *context.Context) []Workload { workloads = append(workloads, extractSparkWorkloads(ctx)...) workloads = append(workloads, extractTrainingWorkloads(ctx)...) workloads = append(workloads, extractAPIWorkloads(ctx)...) + workloads = append(workloads, extractHPAWorkloads(ctx)...) return workloads } @@ -290,6 +291,12 @@ func IsDeploymentUpdating(appName string) (bool, error) { } for _, workload := range extractWorkloads(ctx) { + + // Pending HPA workloads shouldn't block new deployments + if workload.GetWorkloadType() == workloadTypeHPA { + continue + } + isSucceeded, err := workload.IsSucceeded(ctx) if err != nil { return false, err diff --git a/pkg/operator/workloads/workload.go b/pkg/operator/workloads/workload.go index 63e53d5584..194840a01e 100644 --- a/pkg/operator/workloads/workload.go +++ b/pkg/operator/workloads/workload.go @@ -23,6 +23,7 @@ import ( const ( workloadTypeAPI = "api" + workloadTypeHPA = "hpa" workloadTypeSpark = "spark-job" workloadTypeTrain = "training-job" workloadTypePythonPackager = "python-packager"