diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 37f09067d8..0e7a9d4325 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -213,6 +213,7 @@ max_instances: 5 image_operator: /cortexlabs/operator:latest image_manager: gcr.io//cortexlabs/manager:latest image_downloader: gcr.io//cortexlabs/downloader:latest +image_request_monitor: gcr.io//cortexlabs/request-monitor:latest image_istio_proxy: gcr.io//cortexlabs/istio-proxy:latest image_istio_pilot: gcr.io//cortexlabs/istio-pilot:latest image_google_pause: gcr.io//cortexlabs/google-pause:latest diff --git a/docs/clusters/gcp/install.md b/docs/clusters/gcp/install.md index 966270617b..32220e3e87 100644 --- a/docs/clusters/gcp/install.md +++ b/docs/clusters/gcp/install.md @@ -74,6 +74,7 @@ The docker images used by the Cortex cluster can also be overridden, although th image_operator: quay.io/cortexlabs/operator:master image_manager: quay.io/cortexlabs/manager:master image_downloader: quay.io/cortexlabs/downloader:master +image_request_monitor: quay.io/cortexlabs/request-monitor:master image_istio_proxy: quay.io/cortexlabs/istio-proxy:master image_istio_pilot: quay.io/cortexlabs/istio-pilot:master image_google_pause: quay.io/cortexlabs/google-pause:master diff --git a/pkg/operator/operator/k8s.go b/pkg/operator/operator/k8s.go index 25e74afa3f..86aa48c965 100644 --- a/pkg/operator/operator/k8s.go +++ b/pkg/operator/operator/k8s.go @@ -930,9 +930,16 @@ func neuronRuntimeDaemonContainer(api *spec.API, volumeMounts []kcore.VolumeMoun } func RequestMonitorContainer(api *spec.API) kcore.Container { + var image string + if config.Provider == types.AWSProviderType { + image = config.CoreConfig.ImageRequestMonitor + } else if config.Provider == types.GCPProviderType { + image = config.GCPCoreConfig.ImageRequestMonitor + } + return kcore.Container{ Name: _requestMonitorContainerName, - Image: config.CoreConfig.ImageRequestMonitor, + Image: image, ImagePullPolicy: kcore.PullAlways, Args: []string{"-p", DefaultRequestMonitorPortStr}, Ports: []kcore.ContainerPort{ diff --git a/pkg/operator/resources/realtimeapi/api.go b/pkg/operator/resources/realtimeapi/api.go index 92bcf7193a..eeeae90f0d 100644 --- a/pkg/operator/resources/realtimeapi/api.go +++ b/pkg/operator/resources/realtimeapi/api.go @@ -29,7 +29,6 @@ import ( "github.com/cortexlabs/cortex/pkg/operator/lib/routines" "github.com/cortexlabs/cortex/pkg/operator/operator" "github.com/cortexlabs/cortex/pkg/operator/schema" - "github.com/cortexlabs/cortex/pkg/types" "github.com/cortexlabs/cortex/pkg/types/spec" "github.com/cortexlabs/cortex/pkg/types/status" "github.com/cortexlabs/cortex/pkg/types/userconfig" @@ -325,10 +324,8 @@ func applyK8sDeployment(api *spec.API, prevDeployment *kapps.Deployment) error { } } - if config.Provider == types.AWSProviderType { - if err := UpdateAutoscalerCron(newDeployment, api); err != nil { - return err - } + if err := UpdateAutoscalerCron(newDeployment, api); err != nil { + return err } return nil diff --git a/pkg/operator/resources/realtimeapi/k8s_specs.go b/pkg/operator/resources/realtimeapi/k8s_specs.go index 860c0ee71a..7c7d9831e1 100644 --- a/pkg/operator/resources/realtimeapi/k8s_specs.go +++ b/pkg/operator/resources/realtimeapi/k8s_specs.go @@ -19,9 +19,7 @@ package realtimeapi import ( "github.com/cortexlabs/cortex/pkg/lib/k8s" "github.com/cortexlabs/cortex/pkg/lib/pointer" - "github.com/cortexlabs/cortex/pkg/operator/config" "github.com/cortexlabs/cortex/pkg/operator/operator" - "github.com/cortexlabs/cortex/pkg/types" "github.com/cortexlabs/cortex/pkg/types/spec" "github.com/cortexlabs/cortex/pkg/types/userconfig" istioclientnetworking "istio.io/client-go/pkg/apis/networking/v1beta1" @@ -45,12 +43,8 @@ func deploymentSpec(api *spec.API, prevDeployment *kapps.Deployment) *kapps.Depl } func tensorflowAPISpec(api *spec.API, prevDeployment *kapps.Deployment) *kapps.Deployment { - containers, volumes := operator.TensorFlowPredictorContainers(api) - - if config.Provider == types.AWSProviderType { - containers = append(containers, operator.RequestMonitorContainer(api)) - } + containers = append(containers, operator.RequestMonitorContainer(api)) return k8s.Deployment(&k8s.DeploymentSpec{ Name: operator.K8sName(api.Name), @@ -98,10 +92,7 @@ func tensorflowAPISpec(api *spec.API, prevDeployment *kapps.Deployment) *kapps.D func pythonAPISpec(api *spec.API, prevDeployment *kapps.Deployment) *kapps.Deployment { containers, volumes := operator.PythonPredictorContainers(api) - - if config.Provider == types.AWSProviderType { - containers = append(containers, operator.RequestMonitorContainer(api)) - } + containers = append(containers, operator.RequestMonitorContainer(api)) return k8s.Deployment(&k8s.DeploymentSpec{ Name: operator.K8sName(api.Name), @@ -149,10 +140,7 @@ func pythonAPISpec(api *spec.API, prevDeployment *kapps.Deployment) *kapps.Deplo func onnxAPISpec(api *spec.API, prevDeployment *kapps.Deployment) *kapps.Deployment { containers, volumes := operator.ONNXPredictorContainers(api) - - if config.Provider == types.AWSProviderType { - containers = append(containers, operator.RequestMonitorContainer(api)) - } + containers = append(containers, operator.RequestMonitorContainer(api)) return k8s.Deployment(&k8s.DeploymentSpec{ Name: operator.K8sName(api.Name), diff --git a/pkg/types/clusterconfig/cluster_config_gcp.go b/pkg/types/clusterconfig/cluster_config_gcp.go index 565910fe96..fa977e9ad7 100644 --- a/pkg/types/clusterconfig/cluster_config_gcp.go +++ b/pkg/types/clusterconfig/cluster_config_gcp.go @@ -47,6 +47,7 @@ type GCPCoreConfig struct { ImageOperator string `json:"image_operator" yaml:"image_operator"` ImageManager string `json:"image_manager" yaml:"image_manager"` ImageDownloader string `json:"image_downloader" yaml:"image_downloader"` + ImageRequestMonitor string `json:"image_request_monitor" yaml:"image_request_monitor"` ImageClusterAutoscaler string `json:"image_cluster_autoscaler" yaml:"image_cluster_autoscaler"` ImageFluentBit string `json:"image_fluent_bit" yaml:"image_fluent_bit"` ImageIstioProxy string `json:"image_istio_proxy" yaml:"image_istio_proxy"` @@ -167,6 +168,13 @@ var GCPCoreConfigStructFieldValidations = []*cr.StructFieldValidation{ Validator: validateImageVersion, }, }, + { + StructField: "ImageRequestMonitor", + StringValidation: &cr.StringValidation{ + Default: "quay.io/cortexlabs/request-monitor:" + consts.CortexVersion, + Validator: validateImageVersion, + }, + }, { StructField: "ImageClusterAutoscaler", StringValidation: &cr.StringValidation{ @@ -655,6 +663,7 @@ func (cc *GCPCoreConfig) UserTable() table.KeyValuePairs { items.Add(ImageOperatorUserKey, cc.ImageOperator) items.Add(ImageManagerUserKey, cc.ImageManager) items.Add(ImageDownloaderUserKey, cc.ImageDownloader) + items.Add(ImageRequestMonitorUserKey, cc.ImageRequestMonitor) items.Add(ImageClusterAutoscalerUserKey, cc.ImageClusterAutoscaler) items.Add(ImageFluentBitUserKey, cc.ImageFluentBit) items.Add(ImageIstioProxyUserKey, cc.ImageIstioProxy) @@ -739,6 +748,9 @@ func (cc *GCPCoreConfig) TelemetryEvent() map[string]interface{} { if !strings.HasPrefix(cc.ImageDownloader, "cortexlabs/") { event["image_downloader._is_custom"] = true } + if !strings.HasPrefix(cc.ImageRequestMonitor, "cortexlabs/") { + event["image_request_monitor._is_custom"] = true + } if !strings.HasPrefix(cc.ImageClusterAutoscaler, "cortexlabs/") { event["image_cluster_autoscaler._is_custom"] = true } diff --git a/pkg/types/spec/validations.go b/pkg/types/spec/validations.go index bfabe2d9c8..20a216af06 100644 --- a/pkg/types/spec/validations.go +++ b/pkg/types/spec/validations.go @@ -59,27 +59,27 @@ func apiValidation( case userconfig.RealtimeAPIKind: structFieldValidations = append(resourceStructValidations, predictorValidation(), - networkingValidation(resource.Kind, provider), + networkingValidation(), computeValidation(provider), - autoscalingValidation(provider), - updateStrategyValidation(provider), + autoscalingValidation(), + updateStrategyValidation(), ) case userconfig.BatchAPIKind: structFieldValidations = append(resourceStructValidations, predictorValidation(), - networkingValidation(resource.Kind, provider), + networkingValidation(), computeValidation(provider), ) case userconfig.TaskAPIKind: structFieldValidations = append(resourceStructValidations, taskDefinitionValidation(), - networkingValidation(resource.Kind, provider), + networkingValidation(), computeValidation(provider), ) case userconfig.TrafficSplitterKind: structFieldValidations = append(resourceStructValidations, multiAPIsValidation(), - networkingValidation(resource.Kind, provider), + networkingValidation(), ) } return &cr.StructValidation{ @@ -315,10 +315,7 @@ func taskDefinitionValidation() *cr.StructFieldValidation { } } -func networkingValidation( - kind userconfig.Kind, - provider types.ProviderType, -) *cr.StructFieldValidation { +func networkingValidation() *cr.StructFieldValidation { return &cr.StructFieldValidation{ StructField: "Networking", StructValidation: &cr.StructValidation{ @@ -396,8 +393,8 @@ func computeValidation(provider types.ProviderType) *cr.StructFieldValidation { return structFieldValidation } -func autoscalingValidation(provider types.ProviderType) *cr.StructFieldValidation { - structFieldValidation := &cr.StructFieldValidation{ +func autoscalingValidation() *cr.StructFieldValidation { + return &cr.StructFieldValidation{ StructField: "Autoscaling", StructValidation: &cr.StructValidation{ StructFieldValidations: []*cr.StructFieldValidation{ @@ -432,140 +429,76 @@ func autoscalingValidation(provider types.ProviderType) *cr.StructFieldValidatio LessThanOrEqualTo: pointer.Int64(30000), }, }, - }, - }, - } - - if provider == types.AWSProviderType { - structFieldValidation.StructValidation.StructFieldValidations = append(structFieldValidation.StructValidation.StructFieldValidations, - &cr.StructFieldValidation{ - StructField: "TargetReplicaConcurrency", - Float64PtrValidation: &cr.Float64PtrValidation{ - GreaterThan: pointer.Float64(0), - }, - }, - &cr.StructFieldValidation{ - StructField: "Window", - StringValidation: &cr.StringValidation{ - Default: "60s", - }, - Parser: cr.DurationParser(&cr.DurationValidation{ - GreaterThanOrEqualTo: &AutoscalingTickInterval, - MultipleOf: &AutoscalingTickInterval, - }), - }, - &cr.StructFieldValidation{ - StructField: "DownscaleStabilizationPeriod", - StringValidation: &cr.StringValidation{ - Default: "5m", - }, - Parser: cr.DurationParser(&cr.DurationValidation{ - GreaterThanOrEqualTo: pointer.Duration(libtime.MustParseDuration("0s")), - }), - }, - &cr.StructFieldValidation{ - StructField: "UpscaleStabilizationPeriod", - StringValidation: &cr.StringValidation{ - Default: "1m", - }, - Parser: cr.DurationParser(&cr.DurationValidation{ - GreaterThanOrEqualTo: pointer.Duration(libtime.MustParseDuration("0s")), - }), - }, - &cr.StructFieldValidation{ - StructField: "MaxDownscaleFactor", - Float64Validation: &cr.Float64Validation{ - Default: 0.75, - GreaterThanOrEqualTo: pointer.Float64(0), - LessThan: pointer.Float64(1), - }, - }, - &cr.StructFieldValidation{ - StructField: "MaxUpscaleFactor", - Float64Validation: &cr.Float64Validation{ - Default: 1.5, - GreaterThan: pointer.Float64(1), - }, - }, - &cr.StructFieldValidation{ - StructField: "DownscaleTolerance", - Float64Validation: &cr.Float64Validation{ - Default: 0.05, - GreaterThanOrEqualTo: pointer.Float64(0), - LessThan: pointer.Float64(1), - }, - }, - &cr.StructFieldValidation{ - StructField: "UpscaleTolerance", - Float64Validation: &cr.Float64Validation{ - Default: 0.05, - GreaterThanOrEqualTo: pointer.Float64(0), - }, - }, - ) - } else { - structFieldValidation.StructValidation.StructFieldValidations = append(structFieldValidation.StructValidation.StructFieldValidations, - &cr.StructFieldValidation{ - StructField: "TargetReplicaConcurrency", - Float64PtrValidation: &cr.Float64PtrValidation{ - CantBeSpecifiedErrStr: pointer.String("only supported on AWS clusters"), + { + StructField: "TargetReplicaConcurrency", + Float64PtrValidation: &cr.Float64PtrValidation{ + GreaterThan: pointer.Float64(0), + }, }, - }, - &cr.StructFieldValidation{ - StructField: "Window", - StringValidation: &cr.StringValidation{ - CantBeSpecifiedErrStr: pointer.String("only supported on AWS clusters"), - Default: "0", + { + StructField: "Window", + StringValidation: &cr.StringValidation{ + Default: "60s", + }, + Parser: cr.DurationParser(&cr.DurationValidation{ + GreaterThanOrEqualTo: &AutoscalingTickInterval, + MultipleOf: &AutoscalingTickInterval, + }), }, - Parser: cr.DurationParser(nil), - }, - &cr.StructFieldValidation{ - StructField: "DownscaleStabilizationPeriod", - StringValidation: &cr.StringValidation{ - CantBeSpecifiedErrStr: pointer.String("only supported on AWS clusters"), - Default: "0", + { + StructField: "DownscaleStabilizationPeriod", + StringValidation: &cr.StringValidation{ + Default: "5m", + }, + Parser: cr.DurationParser(&cr.DurationValidation{ + GreaterThanOrEqualTo: pointer.Duration(libtime.MustParseDuration("0s")), + }), }, - Parser: cr.DurationParser(nil), - }, - &cr.StructFieldValidation{ - StructField: "UpscaleStabilizationPeriod", - StringValidation: &cr.StringValidation{ - CantBeSpecifiedErrStr: pointer.String("only supported on AWS clusters"), - Default: "0", + { + StructField: "UpscaleStabilizationPeriod", + StringValidation: &cr.StringValidation{ + Default: "1m", + }, + Parser: cr.DurationParser(&cr.DurationValidation{ + GreaterThanOrEqualTo: pointer.Duration(libtime.MustParseDuration("0s")), + }), }, - Parser: cr.DurationParser(nil), - }, - &cr.StructFieldValidation{ - StructField: "MaxDownscaleFactor", - Float64Validation: &cr.Float64Validation{ - CantBeSpecifiedErrStr: pointer.String("only supported on AWS clusters"), + { + StructField: "MaxDownscaleFactor", + Float64Validation: &cr.Float64Validation{ + Default: 0.75, + GreaterThanOrEqualTo: pointer.Float64(0), + LessThan: pointer.Float64(1), + }, }, - }, - &cr.StructFieldValidation{ - StructField: "MaxUpscaleFactor", - Float64Validation: &cr.Float64Validation{ - CantBeSpecifiedErrStr: pointer.String("only supported on AWS clusters"), + { + StructField: "MaxUpscaleFactor", + Float64Validation: &cr.Float64Validation{ + Default: 1.5, + GreaterThan: pointer.Float64(1), + }, }, - }, - &cr.StructFieldValidation{ - StructField: "DownscaleTolerance", - Float64Validation: &cr.Float64Validation{ - CantBeSpecifiedErrStr: pointer.String("only supported on AWS clusters"), + { + StructField: "DownscaleTolerance", + Float64Validation: &cr.Float64Validation{ + Default: 0.05, + GreaterThanOrEqualTo: pointer.Float64(0), + LessThan: pointer.Float64(1), + }, }, - }, - &cr.StructFieldValidation{ - StructField: "UpscaleTolerance", - Float64Validation: &cr.Float64Validation{ - CantBeSpecifiedErrStr: pointer.String("only supported on AWS clusters"), + { + StructField: "UpscaleTolerance", + Float64Validation: &cr.Float64Validation{ + Default: 0.05, + GreaterThanOrEqualTo: pointer.Float64(0), + }, }, }, - ) + }, } - - return structFieldValidation } -func updateStrategyValidation(provider types.ProviderType) *cr.StructFieldValidation { +func updateStrategyValidation() *cr.StructFieldValidation { return &cr.StructFieldValidation{ StructField: "UpdateStrategy", StructValidation: &cr.StructValidation{ @@ -900,18 +833,18 @@ func validatePredictor( switch predictor.Type { case userconfig.PythonPredictorType: - if err := validatePythonPredictor(api, models, provider, projectFiles, awsClient, gcpClient); err != nil { + if err := validatePythonPredictor(api, models, awsClient, gcpClient); err != nil { return err } case userconfig.TensorFlowPredictorType: - if err := validateTensorFlowPredictor(api, models, provider, projectFiles, awsClient, gcpClient); err != nil { + if err := validateTensorFlowPredictor(api, models, awsClient, gcpClient); err != nil { return err } if err := validateDockerImagePath(predictor.TensorFlowServingImage, provider, awsClient, k8sClient); err != nil { return errors.Wrap(err, userconfig.TensorFlowServingImageKey) } case userconfig.ONNXPredictorType: - if err := validateONNXPredictor(api, models, provider, projectFiles, awsClient, gcpClient); err != nil { + if err := validateONNXPredictor(api, models, awsClient, gcpClient); err != nil { return err } } @@ -1023,7 +956,7 @@ func validateMultiModelsFields(api *userconfig.API) error { return nil } -func validatePythonPredictor(api *userconfig.API, models *[]CuratedModelResource, provider types.ProviderType, projectFiles ProjectFiles, awsClient *aws.Client, gcpClient *gcp.Client) error { +func validatePythonPredictor(api *userconfig.API, models *[]CuratedModelResource, awsClient *aws.Client, gcpClient *gcp.Client) error { predictor := api.Predictor if predictor.Models != nil { @@ -1125,7 +1058,7 @@ func validatePythonPredictor(api *userconfig.API, models *[]CuratedModelResource return nil } -func validateTensorFlowPredictor(api *userconfig.API, models *[]CuratedModelResource, provider types.ProviderType, projectFiles ProjectFiles, awsClient *aws.Client, gcpClient *gcp.Client) error { +func validateTensorFlowPredictor(api *userconfig.API, models *[]CuratedModelResource, awsClient *aws.Client, gcpClient *gcp.Client) error { predictor := api.Predictor if predictor.ServerSideBatching != nil { @@ -1182,7 +1115,7 @@ func validateTensorFlowPredictor(api *userconfig.API, models *[]CuratedModelReso } } - validators := []modelValidator{} + var validators []modelValidator if api.Compute.Inf == 0 { validators = append(validators, tensorflowModelValidator) } else { @@ -1214,7 +1147,7 @@ func validateTensorFlowPredictor(api *userconfig.API, models *[]CuratedModelReso return nil } -func validateONNXPredictor(api *userconfig.API, models *[]CuratedModelResource, provider types.ProviderType, projectFiles ProjectFiles, awsClient *aws.Client, gcpClient *gcp.Client) error { +func validateONNXPredictor(api *userconfig.API, models *[]CuratedModelResource, awsClient *aws.Client, gcpClient *gcp.Client) error { predictor := api.Predictor if predictor.Models.SignatureKey != nil { diff --git a/pkg/types/userconfig/api.go b/pkg/types/userconfig/api.go index 7168dab76b..1be57ae6c4 100644 --- a/pkg/types/userconfig/api.go +++ b/pkg/types/userconfig/api.go @@ -355,7 +355,7 @@ func (api *API) UserStr(provider types.ProviderType) string { if api.Autoscaling != nil { sb.WriteString(fmt.Sprintf("%s:\n", AutoscalingKey)) - sb.WriteString(s.Indent(api.Autoscaling.UserStr(provider), " ")) + sb.WriteString(s.Indent(api.Autoscaling.UserStr(), " ")) } if api.UpdateStrategy != nil { @@ -569,23 +569,20 @@ func (compute Compute) Equals(c2 *Compute) bool { return true } -func (autoscaling *Autoscaling) UserStr(provider types.ProviderType) string { +func (autoscaling *Autoscaling) UserStr() string { var sb strings.Builder sb.WriteString(fmt.Sprintf("%s: %s\n", MinReplicasKey, s.Int32(autoscaling.MinReplicas))) sb.WriteString(fmt.Sprintf("%s: %s\n", MaxReplicasKey, s.Int32(autoscaling.MaxReplicas))) sb.WriteString(fmt.Sprintf("%s: %s\n", InitReplicasKey, s.Int32(autoscaling.InitReplicas))) sb.WriteString(fmt.Sprintf("%s: %s\n", MaxReplicaConcurrencyKey, s.Int64(autoscaling.MaxReplicaConcurrency))) - - if provider == types.AWSProviderType { - sb.WriteString(fmt.Sprintf("%s: %s\n", TargetReplicaConcurrencyKey, s.Float64(*autoscaling.TargetReplicaConcurrency))) - sb.WriteString(fmt.Sprintf("%s: %s\n", WindowKey, autoscaling.Window.String())) - sb.WriteString(fmt.Sprintf("%s: %s\n", DownscaleStabilizationPeriodKey, autoscaling.DownscaleStabilizationPeriod.String())) - sb.WriteString(fmt.Sprintf("%s: %s\n", UpscaleStabilizationPeriodKey, autoscaling.UpscaleStabilizationPeriod.String())) - sb.WriteString(fmt.Sprintf("%s: %s\n", MaxDownscaleFactorKey, s.Float64(autoscaling.MaxDownscaleFactor))) - sb.WriteString(fmt.Sprintf("%s: %s\n", MaxUpscaleFactorKey, s.Float64(autoscaling.MaxUpscaleFactor))) - sb.WriteString(fmt.Sprintf("%s: %s\n", DownscaleToleranceKey, s.Float64(autoscaling.DownscaleTolerance))) - sb.WriteString(fmt.Sprintf("%s: %s\n", UpscaleToleranceKey, s.Float64(autoscaling.UpscaleTolerance))) - } + sb.WriteString(fmt.Sprintf("%s: %s\n", TargetReplicaConcurrencyKey, s.Float64(*autoscaling.TargetReplicaConcurrency))) + sb.WriteString(fmt.Sprintf("%s: %s\n", WindowKey, autoscaling.Window.String())) + sb.WriteString(fmt.Sprintf("%s: %s\n", DownscaleStabilizationPeriodKey, autoscaling.DownscaleStabilizationPeriod.String())) + sb.WriteString(fmt.Sprintf("%s: %s\n", UpscaleStabilizationPeriodKey, autoscaling.UpscaleStabilizationPeriod.String())) + sb.WriteString(fmt.Sprintf("%s: %s\n", MaxDownscaleFactorKey, s.Float64(autoscaling.MaxDownscaleFactor))) + sb.WriteString(fmt.Sprintf("%s: %s\n", MaxUpscaleFactorKey, s.Float64(autoscaling.MaxUpscaleFactor))) + sb.WriteString(fmt.Sprintf("%s: %s\n", DownscaleToleranceKey, s.Float64(autoscaling.DownscaleTolerance))) + sb.WriteString(fmt.Sprintf("%s: %s\n", UpscaleToleranceKey, s.Float64(autoscaling.UpscaleTolerance))) return sb.String() }