Skip to content

Add HPA to Async API Gateway to handle different loads of traffic #2079

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/lib/k8s/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
ErrParseLabel = "k8s.parse_label"
ErrParseAnnotation = "k8s.parse_annotation"
ErrParseQuantity = "k8s.parse_quantity"
ErrMissingMetrics = "k8s.missing_metrics"
)

func ErrorLabelNotFound(labelName string) error {
Expand Down Expand Up @@ -66,3 +67,10 @@ func ErrorParseQuantity(qtyStr string) error {
Message: fmt.Sprintf("%s: invalid kubernetes quantity, some valid examples are 1, 200m, 500Mi, 2G (see here for more information: https://docs.cortex.dev/v/%s/)", qtyStr, consts.CortexVersionMinor),
})
}

func ErrorMissingMetrics() error {
return errors.WithStack(&errors.Error{
Kind: ErrMissingMetrics,
Message: "must specify at least one metric",
})
}
81 changes: 53 additions & 28 deletions pkg/lib/k8s/hpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,42 @@ type HPASpec struct {
MinReplicas int32
MaxReplicas int32
TargetCPUUtilization int32
TargetMemUtilization int32
Labels map[string]string
Annotations map[string]string
}

func HPA(spec *HPASpec) *kautoscaling.HorizontalPodAutoscaler {
func HPA(spec *HPASpec) (*kautoscaling.HorizontalPodAutoscaler, error) {
metrics := []kautoscaling.MetricSpec{}
if spec.TargetCPUUtilization > 0 {
metrics = append(metrics, kautoscaling.MetricSpec{
Type: kautoscaling.ResourceMetricSourceType,
Resource: &kautoscaling.ResourceMetricSource{
Name: kcore.ResourceCPU,
Target: kautoscaling.MetricTarget{
Type: kautoscaling.UtilizationMetricType,
AverageUtilization: &spec.TargetCPUUtilization,
},
},
})
}
if spec.TargetMemUtilization > 0 {
metrics = append(metrics, kautoscaling.MetricSpec{
Type: kautoscaling.ResourceMetricSourceType,
Resource: &kautoscaling.ResourceMetricSource{
Name: kcore.ResourceMemory,
Target: kautoscaling.MetricTarget{
Type: kautoscaling.UtilizationMetricType,
AverageUtilization: &spec.TargetMemUtilization,
},
},
})
}

if len(metrics) == 0 {
return nil, ErrorMissingMetrics()
}

hpa := &kautoscaling.HorizontalPodAutoscaler{
TypeMeta: _hpaTypeMeta,
ObjectMeta: kmeta.ObjectMeta{
Expand All @@ -52,26 +83,15 @@ func HPA(spec *HPASpec) *kautoscaling.HorizontalPodAutoscaler {
Spec: kautoscaling.HorizontalPodAutoscalerSpec{
MinReplicas: &spec.MinReplicas,
MaxReplicas: spec.MaxReplicas,
Metrics: []kautoscaling.MetricSpec{
{
Type: kautoscaling.ResourceMetricSourceType,
Resource: &kautoscaling.ResourceMetricSource{
Name: kcore.ResourceCPU,
Target: kautoscaling.MetricTarget{
Type: kautoscaling.UtilizationMetricType,
AverageUtilization: &spec.TargetCPUUtilization,
},
},
},
},
Metrics: metrics,
ScaleTargetRef: kautoscaling.CrossVersionObjectReference{
Kind: _deploymentTypeMeta.Kind,
Name: spec.DeploymentName,
APIVersion: _deploymentTypeMeta.APIVersion,
},
},
}
return hpa
return hpa, nil
}

func (c *Client) CreateHPA(hpa *kautoscaling.HorizontalPodAutoscaler) (*kautoscaling.HorizontalPodAutoscaler, error) {
Expand Down Expand Up @@ -166,7 +186,7 @@ func HPAMap(hpas []kautoscaling.HorizontalPodAutoscaler) map[string]kautoscaling
return hpaMap
}

func IsHPAUpToDate(hpa *kautoscaling.HorizontalPodAutoscaler, minReplicas int32, maxReplicas int32, targetCPUUtilization int32) bool {
func IsHPAUpToDate(hpa *kautoscaling.HorizontalPodAutoscaler, minReplicas, maxReplicas, targetCPUUtilization, targetMemUtilization int32) bool {
if hpa == nil {
return false
}
Expand All @@ -179,21 +199,26 @@ func IsHPAUpToDate(hpa *kautoscaling.HorizontalPodAutoscaler, minReplicas int32,
return false
}

if len(hpa.Spec.Metrics) != 1 {
return false
}
metric := hpa.Spec.Metrics[0]
if metric.Type != kautoscaling.ResourceMetricSourceType || metric.Resource == nil {
return false
}
if metric.Resource.Name != kcore.ResourceCPU {
if len(hpa.Spec.Metrics) != 2 {
return false
}
if metric.Resource.Target.Type != kautoscaling.UtilizationMetricType || metric.Resource.Target.AverageUtilization == nil {
return false
}
if *metric.Resource.Target.AverageUtilization != targetCPUUtilization {
return false

for _, metric := range hpa.Spec.Metrics {
if metric.Type != kautoscaling.ResourceMetricSourceType || metric.Resource == nil {
return false
}
if metric.Resource.Target.Type != kautoscaling.UtilizationMetricType || metric.Resource.Target.AverageUtilization == nil {
return false
}
if metric.Resource.Name != kcore.ResourceCPU && metric.Resource.Name != kcore.ResourceMemory {
return false
}
if metric.Resource.Name == kcore.ResourceCPU && *metric.Resource.Target.AverageUtilization != targetCPUUtilization {
return false
}
if metric.Resource.Name == kcore.ResourceMemory && *metric.Resource.Target.AverageUtilization != targetMemUtilization {
return false
}
}

return true
Expand Down
55 changes: 46 additions & 9 deletions pkg/operator/resources/asyncapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cortexlabs/cortex/pkg/types/userconfig"
istioclientnetworking "istio.io/client-go/pkg/apis/networking/v1beta1"
kapps "k8s.io/api/apps/v1"
kautoscaling "k8s.io/api/autoscaling/v2beta2"
kcore "k8s.io/api/core/v1"
)

Expand All @@ -51,6 +52,7 @@ type resources struct {
apiDeployment *kapps.Deployment
gatewayDeployment *kapps.Deployment
gatewayService *kcore.Service
gatewayHPA *kautoscaling.HorizontalPodAutoscaler
gatewayVirtualService *istioclientnetworking.VirtualService
}

Expand Down Expand Up @@ -287,6 +289,7 @@ func getK8sResources(apiConfig userconfig.API) (resources, error) {
var deployment *kapps.Deployment
var gatewayDeployment *kapps.Deployment
var gatewayService *kcore.Service
var gatewayHPA *kautoscaling.HorizontalPodAutoscaler
var gatewayVirtualService *istioclientnetworking.VirtualService

gatewayK8sName := getGatewayK8sName(apiConfig.Name)
Expand All @@ -308,6 +311,11 @@ func getK8sResources(apiConfig userconfig.API) (resources, error) {
gatewayService, err = config.K8s.GetService(apiK8sName)
return err
},
func() error {
var err error
gatewayHPA, err = config.K8s.GetHPA(gatewayK8sName)
return err
},
func() error {
var err error
gatewayVirtualService, err = config.K8s.GetVirtualService(apiK8sName)
Expand All @@ -319,22 +327,43 @@ func getK8sResources(apiConfig userconfig.API) (resources, error) {
apiDeployment: deployment,
gatewayDeployment: gatewayDeployment,
gatewayService: gatewayService,
gatewayHPA: gatewayHPA,
gatewayVirtualService: gatewayVirtualService,
}, err
}

func applyK8sResources(api spec.API, prevK8sResources resources, queueURL string) error {
apiDeployment := apiDeploymentSpec(api, prevK8sResources.apiDeployment, queueURL)
gatewayDeployment := gatewayDeploymentSpec(api, prevK8sResources.gatewayDeployment, queueURL)
gatewayHPA, err := gatewayHPASpec(api)
if err != nil {
return err
}
gatewayService := gatewayServiceSpec(api)
gatewayVirtualService := gatewayVirtualServiceSpec(api)

return parallel.RunFirstErr(
func() error {
return applyK8sDeployment(api, prevK8sResources.apiDeployment, &apiDeployment)
err := applyK8sDeployment(prevK8sResources.apiDeployment, &apiDeployment)
if err != nil {
return err
}

if err := UpdateMetricsCron(&apiDeployment); err != nil {
return err
}

if err := UpdateAutoscalerCron(&apiDeployment, api); err != nil {
return err
}

return nil
},
func() error {
return applyK8sDeployment(prevK8sResources.gatewayDeployment, &gatewayDeployment)
},
func() error {
return applyK8sDeployment(api, prevK8sResources.gatewayDeployment, &gatewayDeployment)
return applyK8sHPA(prevK8sResources.gatewayHPA, &gatewayHPA)
},
func() error {
return applyK8sService(prevK8sResources.gatewayService, &gatewayService)
Expand All @@ -345,7 +374,7 @@ func applyK8sResources(api spec.API, prevK8sResources resources, queueURL string
)
}

func applyK8sDeployment(api spec.API, prevDeployment *kapps.Deployment, newDeployment *kapps.Deployment) error {
func applyK8sDeployment(prevDeployment *kapps.Deployment, newDeployment *kapps.Deployment) error {
if prevDeployment == nil {
_, err := config.K8s.CreateDeployment(newDeployment)
if err != nil {
Expand All @@ -364,15 +393,19 @@ func applyK8sDeployment(api spec.API, prevDeployment *kapps.Deployment, newDeplo
return err
}
}
return nil
}

if err := UpdateMetricsCron(newDeployment); err != nil {
return err
func applyK8sHPA(prevHPA *kautoscaling.HorizontalPodAutoscaler, newHPA *kautoscaling.HorizontalPodAutoscaler) error {
var err error
if prevHPA == nil {
_, err = config.K8s.CreateHPA(newHPA)
} else {
_, err = config.K8s.UpdateHPA(newHPA)
}

if err := UpdateAutoscalerCron(newDeployment, api); err != nil {
if err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -403,6 +436,7 @@ func deleteBucketResources(apiName string) error {

func deleteK8sResources(apiName string) error {
apiK8sName := operator.K8sName(apiName)
gatewayK8sName := getGatewayK8sName(apiName)

err := parallel.RunFirstErr(
func() error {
Expand All @@ -419,10 +453,13 @@ func deleteK8sResources(apiName string) error {
return err
},
func() error {
gatewayK8sName := getGatewayK8sName(apiName)
_, err := config.K8s.DeleteDeployment(gatewayK8sName)
return err
},
func() error {
_, err := config.K8s.DeleteHPA(gatewayK8sName)
return err
},
func() error {
_, err := config.K8s.DeleteService(apiK8sName)
return err
Expand Down
36 changes: 34 additions & 2 deletions pkg/operator/resources/asyncapi/k8s_specs.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,19 @@ import (
"github.com/cortexlabs/cortex/pkg/types/userconfig"
"istio.io/client-go/pkg/apis/networking/v1beta1"
kapps "k8s.io/api/apps/v1"
kautoscaling "k8s.io/api/autoscaling/v2beta2"
kcore "k8s.io/api/core/v1"
)

var _terminationGracePeriodSeconds int64 = 60 // seconds
var _terminationGracePeriodSeconds int64 = 60 // seconds
var _gatewayHPATargetCPUUtilization int32 = 80 // percentage
var _gatewayHPATargetMemUtilization int32 = 80 // percentage

func gatewayDeploymentSpec(api spec.API, prevDeployment *kapps.Deployment, queueURL string) kapps.Deployment {
container := operator.AsyncGatewayContainers(api, queueURL)
return *k8s.Deployment(&k8s.DeploymentSpec{
Name: getGatewayK8sName(api.Name),
Replicas: getRequestedReplicasFromDeployment(api, prevDeployment),
Replicas: 1,
MaxSurge: pointer.String(api.UpdateStrategy.MaxSurge),
MaxUnavailable: pointer.String(api.UpdateStrategy.MaxUnavailable),
Selector: map[string]string{
Expand Down Expand Up @@ -78,6 +81,35 @@ func gatewayDeploymentSpec(api spec.API, prevDeployment *kapps.Deployment, queue
})
}

func gatewayHPASpec(api spec.API) (kautoscaling.HorizontalPodAutoscaler, error) {
var maxReplicas int32 = 1
if api.Autoscaling != nil {
maxReplicas = api.Autoscaling.MaxReplicas
}
hpa, err := k8s.HPA(&k8s.HPASpec{
DeploymentName: getGatewayK8sName(api.Name),
MinReplicas: 1,
MaxReplicas: maxReplicas,
TargetCPUUtilization: _gatewayHPATargetCPUUtilization,
TargetMemUtilization: _gatewayHPATargetMemUtilization,
Labels: map[string]string{
"apiName": api.Name,
"apiKind": api.Kind.String(),
"apiID": api.ID,
"specID": api.SpecID,
"deploymentID": api.DeploymentID,
"predictorID": api.PredictorID,
"cortex.dev/api": "true",
"cortex.dev/async": "hpa",
},
})

if err != nil {
return kautoscaling.HorizontalPodAutoscaler{}, err
}
return *hpa, nil
}

func gatewayServiceSpec(api spec.API) kcore.Service {
return *k8s.Service(&k8s.ServiceSpec{
Name: operator.K8sName(api.Name),
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ def pytest_configure(config):
"workload_timeout": 180, # measured in seconds
},
"task": {
"jobs": 10 ** 2,
"jobs": 10 ** 4,
"concurrency": 4,
"submit_timeout": 60, # measured in seconds
"submit_timeout": 180, # measured in seconds
"workload_timeout": 180, # measured in seconds
},
},
Expand Down