diff --git a/pkg/lib/k8s/errors.go b/pkg/lib/k8s/errors.go index edc93f1919..402901202a 100644 --- a/pkg/lib/k8s/errors.go +++ b/pkg/lib/k8s/errors.go @@ -30,6 +30,7 @@ const ( ErrParseLabel = "k8s.parse_label" ErrParseAnnotation = "k8s.parse_annotation" ErrParseQuantity = "k8s.parse_quantity" + ErrMissingMetrics = "k8s.missing_metrics" ) func ErrorLabelNotFound(labelName string) error { @@ -66,3 +67,10 @@ func ErrorParseQuantity(qtyStr string) error { Message: fmt.Sprintf("%s: invalid kubernetes quantity, some valid examples are 1, 200m, 500Mi, 2G (see here for more information: https://docs.cortex.dev/v/%s/)", qtyStr, consts.CortexVersionMinor), }) } + +func ErrorMissingMetrics() error { + return errors.WithStack(&errors.Error{ + Kind: ErrMissingMetrics, + Message: "must specify at least one metric", + }) +} diff --git a/pkg/lib/k8s/hpa.go b/pkg/lib/k8s/hpa.go index e360b90ca0..4d500a1ef2 100644 --- a/pkg/lib/k8s/hpa.go +++ b/pkg/lib/k8s/hpa.go @@ -37,11 +37,42 @@ type HPASpec struct { MinReplicas int32 MaxReplicas int32 TargetCPUUtilization int32 + TargetMemUtilization int32 Labels map[string]string Annotations map[string]string } -func HPA(spec *HPASpec) *kautoscaling.HorizontalPodAutoscaler { +func HPA(spec *HPASpec) (*kautoscaling.HorizontalPodAutoscaler, error) { + metrics := []kautoscaling.MetricSpec{} + if spec.TargetCPUUtilization > 0 { + metrics = append(metrics, kautoscaling.MetricSpec{ + Type: kautoscaling.ResourceMetricSourceType, + Resource: &kautoscaling.ResourceMetricSource{ + Name: kcore.ResourceCPU, + Target: kautoscaling.MetricTarget{ + Type: kautoscaling.UtilizationMetricType, + AverageUtilization: &spec.TargetCPUUtilization, + }, + }, + }) + } + if spec.TargetMemUtilization > 0 { + metrics = append(metrics, kautoscaling.MetricSpec{ + Type: kautoscaling.ResourceMetricSourceType, + Resource: &kautoscaling.ResourceMetricSource{ + Name: kcore.ResourceMemory, + Target: kautoscaling.MetricTarget{ + Type: kautoscaling.UtilizationMetricType, + AverageUtilization: &spec.TargetMemUtilization, + }, + }, + }) + } + + if len(metrics) == 0 { + return nil, ErrorMissingMetrics() + } + hpa := &kautoscaling.HorizontalPodAutoscaler{ TypeMeta: _hpaTypeMeta, ObjectMeta: kmeta.ObjectMeta{ @@ -52,18 +83,7 @@ func HPA(spec *HPASpec) *kautoscaling.HorizontalPodAutoscaler { Spec: kautoscaling.HorizontalPodAutoscalerSpec{ MinReplicas: &spec.MinReplicas, MaxReplicas: spec.MaxReplicas, - Metrics: []kautoscaling.MetricSpec{ - { - Type: kautoscaling.ResourceMetricSourceType, - Resource: &kautoscaling.ResourceMetricSource{ - Name: kcore.ResourceCPU, - Target: kautoscaling.MetricTarget{ - Type: kautoscaling.UtilizationMetricType, - AverageUtilization: &spec.TargetCPUUtilization, - }, - }, - }, - }, + Metrics: metrics, ScaleTargetRef: kautoscaling.CrossVersionObjectReference{ Kind: _deploymentTypeMeta.Kind, Name: spec.DeploymentName, @@ -71,7 +91,7 @@ func HPA(spec *HPASpec) *kautoscaling.HorizontalPodAutoscaler { }, }, } - return hpa + return hpa, nil } func (c *Client) CreateHPA(hpa *kautoscaling.HorizontalPodAutoscaler) (*kautoscaling.HorizontalPodAutoscaler, error) { @@ -166,7 +186,7 @@ func HPAMap(hpas []kautoscaling.HorizontalPodAutoscaler) map[string]kautoscaling return hpaMap } -func IsHPAUpToDate(hpa *kautoscaling.HorizontalPodAutoscaler, minReplicas int32, maxReplicas int32, targetCPUUtilization int32) bool { +func IsHPAUpToDate(hpa *kautoscaling.HorizontalPodAutoscaler, minReplicas, maxReplicas, targetCPUUtilization, targetMemUtilization int32) bool { if hpa == nil { return false } @@ -179,21 +199,26 @@ func IsHPAUpToDate(hpa *kautoscaling.HorizontalPodAutoscaler, minReplicas int32, return false } - if len(hpa.Spec.Metrics) != 1 { - return false - } - metric := hpa.Spec.Metrics[0] - if metric.Type != kautoscaling.ResourceMetricSourceType || metric.Resource == nil { - return false - } - if metric.Resource.Name != kcore.ResourceCPU { + if len(hpa.Spec.Metrics) != 2 { return false } - if metric.Resource.Target.Type != kautoscaling.UtilizationMetricType || metric.Resource.Target.AverageUtilization == nil { - return false - } - if *metric.Resource.Target.AverageUtilization != targetCPUUtilization { - return false + + for _, metric := range hpa.Spec.Metrics { + if metric.Type != kautoscaling.ResourceMetricSourceType || metric.Resource == nil { + return false + } + if metric.Resource.Target.Type != kautoscaling.UtilizationMetricType || metric.Resource.Target.AverageUtilization == nil { + return false + } + if metric.Resource.Name != kcore.ResourceCPU && metric.Resource.Name != kcore.ResourceMemory { + return false + } + if metric.Resource.Name == kcore.ResourceCPU && *metric.Resource.Target.AverageUtilization != targetCPUUtilization { + return false + } + if metric.Resource.Name == kcore.ResourceMemory && *metric.Resource.Target.AverageUtilization != targetMemUtilization { + return false + } } return true diff --git a/pkg/operator/resources/asyncapi/api.go b/pkg/operator/resources/asyncapi/api.go index 390e59aab5..1b77f427e4 100644 --- a/pkg/operator/resources/asyncapi/api.go +++ b/pkg/operator/resources/asyncapi/api.go @@ -34,6 +34,7 @@ import ( "github.com/cortexlabs/cortex/pkg/types/userconfig" istioclientnetworking "istio.io/client-go/pkg/apis/networking/v1beta1" kapps "k8s.io/api/apps/v1" + kautoscaling "k8s.io/api/autoscaling/v2beta2" kcore "k8s.io/api/core/v1" ) @@ -51,6 +52,7 @@ type resources struct { apiDeployment *kapps.Deployment gatewayDeployment *kapps.Deployment gatewayService *kcore.Service + gatewayHPA *kautoscaling.HorizontalPodAutoscaler gatewayVirtualService *istioclientnetworking.VirtualService } @@ -287,6 +289,7 @@ func getK8sResources(apiConfig userconfig.API) (resources, error) { var deployment *kapps.Deployment var gatewayDeployment *kapps.Deployment var gatewayService *kcore.Service + var gatewayHPA *kautoscaling.HorizontalPodAutoscaler var gatewayVirtualService *istioclientnetworking.VirtualService gatewayK8sName := getGatewayK8sName(apiConfig.Name) @@ -308,6 +311,11 @@ func getK8sResources(apiConfig userconfig.API) (resources, error) { gatewayService, err = config.K8s.GetService(apiK8sName) return err }, + func() error { + var err error + gatewayHPA, err = config.K8s.GetHPA(gatewayK8sName) + return err + }, func() error { var err error gatewayVirtualService, err = config.K8s.GetVirtualService(apiK8sName) @@ -319,6 +327,7 @@ func getK8sResources(apiConfig userconfig.API) (resources, error) { apiDeployment: deployment, gatewayDeployment: gatewayDeployment, gatewayService: gatewayService, + gatewayHPA: gatewayHPA, gatewayVirtualService: gatewayVirtualService, }, err } @@ -326,15 +335,35 @@ func getK8sResources(apiConfig userconfig.API) (resources, error) { func applyK8sResources(api spec.API, prevK8sResources resources, queueURL string) error { apiDeployment := apiDeploymentSpec(api, prevK8sResources.apiDeployment, queueURL) gatewayDeployment := gatewayDeploymentSpec(api, prevK8sResources.gatewayDeployment, queueURL) + gatewayHPA, err := gatewayHPASpec(api) + if err != nil { + return err + } gatewayService := gatewayServiceSpec(api) gatewayVirtualService := gatewayVirtualServiceSpec(api) return parallel.RunFirstErr( func() error { - return applyK8sDeployment(api, prevK8sResources.apiDeployment, &apiDeployment) + err := applyK8sDeployment(prevK8sResources.apiDeployment, &apiDeployment) + if err != nil { + return err + } + + if err := UpdateMetricsCron(&apiDeployment); err != nil { + return err + } + + if err := UpdateAutoscalerCron(&apiDeployment, api); err != nil { + return err + } + + return nil + }, + func() error { + return applyK8sDeployment(prevK8sResources.gatewayDeployment, &gatewayDeployment) }, func() error { - return applyK8sDeployment(api, prevK8sResources.gatewayDeployment, &gatewayDeployment) + return applyK8sHPA(prevK8sResources.gatewayHPA, &gatewayHPA) }, func() error { return applyK8sService(prevK8sResources.gatewayService, &gatewayService) @@ -345,7 +374,7 @@ func applyK8sResources(api spec.API, prevK8sResources resources, queueURL string ) } -func applyK8sDeployment(api spec.API, prevDeployment *kapps.Deployment, newDeployment *kapps.Deployment) error { +func applyK8sDeployment(prevDeployment *kapps.Deployment, newDeployment *kapps.Deployment) error { if prevDeployment == nil { _, err := config.K8s.CreateDeployment(newDeployment) if err != nil { @@ -364,15 +393,19 @@ func applyK8sDeployment(api spec.API, prevDeployment *kapps.Deployment, newDeplo return err } } + return nil +} - if err := UpdateMetricsCron(newDeployment); err != nil { - return err +func applyK8sHPA(prevHPA *kautoscaling.HorizontalPodAutoscaler, newHPA *kautoscaling.HorizontalPodAutoscaler) error { + var err error + if prevHPA == nil { + _, err = config.K8s.CreateHPA(newHPA) + } else { + _, err = config.K8s.UpdateHPA(newHPA) } - - if err := UpdateAutoscalerCron(newDeployment, api); err != nil { + if err != nil { return err } - return nil } @@ -403,6 +436,7 @@ func deleteBucketResources(apiName string) error { func deleteK8sResources(apiName string) error { apiK8sName := operator.K8sName(apiName) + gatewayK8sName := getGatewayK8sName(apiName) err := parallel.RunFirstErr( func() error { @@ -419,10 +453,13 @@ func deleteK8sResources(apiName string) error { return err }, func() error { - gatewayK8sName := getGatewayK8sName(apiName) _, err := config.K8s.DeleteDeployment(gatewayK8sName) return err }, + func() error { + _, err := config.K8s.DeleteHPA(gatewayK8sName) + return err + }, func() error { _, err := config.K8s.DeleteService(apiK8sName) return err diff --git a/pkg/operator/resources/asyncapi/k8s_specs.go b/pkg/operator/resources/asyncapi/k8s_specs.go index 722e8013d0..b5550bef8d 100644 --- a/pkg/operator/resources/asyncapi/k8s_specs.go +++ b/pkg/operator/resources/asyncapi/k8s_specs.go @@ -26,16 +26,19 @@ import ( "github.com/cortexlabs/cortex/pkg/types/userconfig" "istio.io/client-go/pkg/apis/networking/v1beta1" kapps "k8s.io/api/apps/v1" + kautoscaling "k8s.io/api/autoscaling/v2beta2" kcore "k8s.io/api/core/v1" ) -var _terminationGracePeriodSeconds int64 = 60 // seconds +var _terminationGracePeriodSeconds int64 = 60 // seconds +var _gatewayHPATargetCPUUtilization int32 = 80 // percentage +var _gatewayHPATargetMemUtilization int32 = 80 // percentage func gatewayDeploymentSpec(api spec.API, prevDeployment *kapps.Deployment, queueURL string) kapps.Deployment { container := operator.AsyncGatewayContainers(api, queueURL) return *k8s.Deployment(&k8s.DeploymentSpec{ Name: getGatewayK8sName(api.Name), - Replicas: getRequestedReplicasFromDeployment(api, prevDeployment), + Replicas: 1, MaxSurge: pointer.String(api.UpdateStrategy.MaxSurge), MaxUnavailable: pointer.String(api.UpdateStrategy.MaxUnavailable), Selector: map[string]string{ @@ -78,6 +81,35 @@ func gatewayDeploymentSpec(api spec.API, prevDeployment *kapps.Deployment, queue }) } +func gatewayHPASpec(api spec.API) (kautoscaling.HorizontalPodAutoscaler, error) { + var maxReplicas int32 = 1 + if api.Autoscaling != nil { + maxReplicas = api.Autoscaling.MaxReplicas + } + hpa, err := k8s.HPA(&k8s.HPASpec{ + DeploymentName: getGatewayK8sName(api.Name), + MinReplicas: 1, + MaxReplicas: maxReplicas, + TargetCPUUtilization: _gatewayHPATargetCPUUtilization, + TargetMemUtilization: _gatewayHPATargetMemUtilization, + Labels: map[string]string{ + "apiName": api.Name, + "apiKind": api.Kind.String(), + "apiID": api.ID, + "specID": api.SpecID, + "deploymentID": api.DeploymentID, + "predictorID": api.PredictorID, + "cortex.dev/api": "true", + "cortex.dev/async": "hpa", + }, + }) + + if err != nil { + return kautoscaling.HorizontalPodAutoscaler{}, err + } + return *hpa, nil +} + func gatewayServiceSpec(api spec.API) kcore.Service { return *k8s.Service(&k8s.ServiceSpec{ Name: operator.K8sName(api.Name), diff --git a/test/e2e/tests/conftest.py b/test/e2e/tests/conftest.py index c6745be8eb..49bcdd17f3 100644 --- a/test/e2e/tests/conftest.py +++ b/test/e2e/tests/conftest.py @@ -122,9 +122,9 @@ def pytest_configure(config): "workload_timeout": 180, # measured in seconds }, "task": { - "jobs": 10 ** 2, + "jobs": 10 ** 4, "concurrency": 4, - "submit_timeout": 60, # measured in seconds + "submit_timeout": 180, # measured in seconds "workload_timeout": 180, # measured in seconds }, },