From 608ddd97ee76a80d0b5fbf0737c15eb1ad4ca865 Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Fri, 16 Apr 2021 23:38:03 +0300 Subject: [PATCH 1/7] Add node-selector option to APIs --- pkg/operator/operator/k8s.go | 23 +++++++++- pkg/operator/resources/asyncapi/k8s_specs.go | 40 +++++++++++------- pkg/operator/resources/errors.go | 8 ++++ .../resources/job/batchapi/k8s_specs.go | 42 ++++++++++++------- .../resources/job/taskapi/k8s_specs.go | 21 ++++++---- .../resources/realtimeapi/k8s_specs.go | 42 ++++++++++++------- pkg/operator/resources/validations.go | 10 +++++ pkg/types/clusterconfig/cluster_config.go | 20 +++++++++ pkg/types/spec/validations.go | 8 ++++ pkg/types/userconfig/api.go | 32 ++++++++++++-- pkg/types/userconfig/config_key.go | 9 ++-- 11 files changed, 191 insertions(+), 64 deletions(-) diff --git a/pkg/operator/operator/k8s.go b/pkg/operator/operator/k8s.go index 08caade4e0..470cca548d 100644 --- a/pkg/operator/operator/k8s.go +++ b/pkg/operator/operator/k8s.go @@ -1240,10 +1240,29 @@ func defaultVolumeMounts() []kcore.VolumeMount { } } -func NodeSelectors() map[string]string { - return map[string]string{ +func NodeSelectors(nodeGroupSelector *string) map[string]string { + nodeSelectors := map[string]string{ "workload": "true", } + + if nodeGroupSelector == nil { + return nodeSelectors + } + + var nodeGroupPrefix string + for _, nodeGroup := range config.ManagedConfig.NodeGroups { + if nodeGroup.Name == *nodeGroupSelector { + if nodeGroup.Spot { + nodeGroupPrefix = "cx-ws-" + } else { + nodeGroupPrefix = "cx-wd-" + } + break + } + } + + nodeSelectors["alpha.eksctl.io/nodegroup-name"] = nodeGroupPrefix + *nodeGroupSelector + return nodeSelectors } func GenerateResourceTolerations() []kcore.Toleration { diff --git a/pkg/operator/resources/asyncapi/k8s_specs.go b/pkg/operator/resources/asyncapi/k8s_specs.go index b5550bef8d..38d93b5f8b 100644 --- a/pkg/operator/resources/asyncapi/k8s_specs.go +++ b/pkg/operator/resources/asyncapi/k8s_specs.go @@ -36,6 +36,16 @@ var _gatewayHPATargetMemUtilization int32 = 80 // percentage func gatewayDeploymentSpec(api spec.API, prevDeployment *kapps.Deployment, queueURL string) kapps.Deployment { container := operator.AsyncGatewayContainers(api, queueURL) + + var affinity *kcore.Affinity + if api.Compute.Selector == nil { + affinity = &kcore.Affinity{ + NodeAffinity: &kcore.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: operator.GeneratePreferredNodeAffinities(), + }, + } + } + return *k8s.Deployment(&k8s.DeploymentSpec{ Name: getGatewayK8sName(api.Name), Replicas: 1, @@ -69,13 +79,10 @@ func gatewayDeploymentSpec(api spec.API, prevDeployment *kapps.Deployment, queue RestartPolicy: "Always", TerminationGracePeriodSeconds: pointer.Int64(_terminationGracePeriodSeconds), Containers: []kcore.Container{container}, - NodeSelector: operator.NodeSelectors(), + NodeSelector: operator.NodeSelectors(api.Compute.Selector), Tolerations: operator.GenerateResourceTolerations(), - Affinity: &kcore.Affinity{ - NodeAffinity: &kcore.NodeAffinity{ - PreferredDuringSchedulingIgnoredDuringExecution: operator.GeneratePreferredNodeAffinities(), - }, - }, ServiceAccountName: operator.ServiceAccountName, + Affinity: affinity, + ServiceAccountName: operator.ServiceAccountName, }, }, }) @@ -171,6 +178,15 @@ func apiDeploymentSpec(api spec.API, prevDeployment *kapps.Deployment, queueURL panic(fmt.Sprintf("invalid predictor type: %s", api.Predictor.Type)) } + var affinity *kcore.Affinity + if api.Compute.Selector == nil { + affinity = &kcore.Affinity{ + NodeAffinity: &kcore.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: operator.GeneratePreferredNodeAffinities(), + }, + } + } + return *k8s.Deployment(&k8s.DeploymentSpec{ Name: operator.K8sName(api.Name), Replicas: getRequestedReplicasFromDeployment(api, prevDeployment), @@ -207,14 +223,10 @@ func apiDeploymentSpec(api spec.API, prevDeployment *kapps.Deployment, queueURL InitContainers: []kcore.Container{ operator.InitContainer(&api), }, - Containers: containers, - NodeSelector: operator.NodeSelectors(), - Tolerations: operator.GenerateResourceTolerations(), - Affinity: &kcore.Affinity{ - NodeAffinity: &kcore.NodeAffinity{ - PreferredDuringSchedulingIgnoredDuringExecution: operator.GeneratePreferredNodeAffinities(), - }, - }, + Containers: containers, + NodeSelector: operator.NodeSelectors(api.Compute.Selector), + Tolerations: operator.GenerateResourceTolerations(), + Affinity: affinity, Volumes: volumes, ServiceAccountName: operator.ServiceAccountName, }, diff --git a/pkg/operator/resources/errors.go b/pkg/operator/resources/errors.go index 6a829d3145..ecc99381aa 100644 --- a/pkg/operator/resources/errors.go +++ b/pkg/operator/resources/errors.go @@ -37,6 +37,7 @@ const ( ErrRealtimeAPIUsedByTrafficSplitter = "resources.realtime_api_used_by_traffic_splitter" ErrAPIsNotDeployed = "resources.apis_not_deployed" ErrGRPCNotSupportedForTrafficSplitter = "resources.grpc_not_supported_for_traffic_splitter" + ErrInvalidNodeGroupSelector = "resources.invalid_node_group_selector" ) func ErrorOperationIsOnlySupportedForKind(resource operator.DeployedResource, supportedKind userconfig.Kind, supportedKinds ...userconfig.Kind) error { @@ -116,3 +117,10 @@ func ErrorGRPCNotSupportedForTrafficSplitter(grpcAPIName string) error { Message: fmt.Sprintf("api %s (of kind %s) is served using the grpc protocol and therefore, it cannot be used for the %s kind", grpcAPIName, userconfig.RealtimeAPIKind, userconfig.TrafficSplitterKind), }) } + +func ErrorInvalidNodeGroupSelector(selected string, availableNodeGroups []string) error { + return errors.WithStack(&errors.Error{ + Kind: ErrInvalidNodeGroupSelector, + Message: fmt.Sprintf("node group %s doesn't exist; remove the node group selector to let Cortex determine automatically where to place the API or specify a valid node group name (%s)", selected, s.StrsOr(availableNodeGroups)), + }) +} diff --git a/pkg/operator/resources/job/batchapi/k8s_specs.go b/pkg/operator/resources/job/batchapi/k8s_specs.go index c4db5a352d..b8eee5505a 100644 --- a/pkg/operator/resources/job/batchapi/k8s_specs.go +++ b/pkg/operator/resources/job/batchapi/k8s_specs.go @@ -57,6 +57,15 @@ func pythonPredictorJobSpec(api *spec.API, job *spec.BatchJob) (*kbatch.Job, err } } + var affinity *kcore.Affinity + if api.Compute.Selector == nil { + affinity = &kcore.Affinity{ + NodeAffinity: &kcore.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: operator.GeneratePreferredNodeAffinities(), + }, + } + } + return k8s.Job(&k8s.JobSpec{ Name: job.JobKey.K8sName(), Parallelism: int32(job.Workers), @@ -86,14 +95,10 @@ func pythonPredictorJobSpec(api *spec.API, job *spec.BatchJob) (*kbatch.Job, err InitContainers: []kcore.Container{ operator.InitContainer(api), }, - Containers: containers, - NodeSelector: operator.NodeSelectors(), - Tolerations: operator.GenerateResourceTolerations(), - Affinity: &kcore.Affinity{ - NodeAffinity: &kcore.NodeAffinity{ - PreferredDuringSchedulingIgnoredDuringExecution: operator.GeneratePreferredNodeAffinities(), - }, - }, + Containers: containers, + NodeSelector: operator.NodeSelectors(api.Compute.Selector), + Tolerations: operator.GenerateResourceTolerations(), + Affinity: affinity, Volumes: volumes, ServiceAccountName: operator.ServiceAccountName, }, @@ -112,6 +117,15 @@ func tensorFlowPredictorJobSpec(api *spec.API, job *spec.BatchJob) (*kbatch.Job, } } + var affinity *kcore.Affinity + if api.Compute.Selector == nil { + affinity = &kcore.Affinity{ + NodeAffinity: &kcore.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: operator.GeneratePreferredNodeAffinities(), + }, + } + } + return k8s.Job(&k8s.JobSpec{ Name: job.JobKey.K8sName(), Parallelism: int32(job.Workers), @@ -141,14 +155,10 @@ func tensorFlowPredictorJobSpec(api *spec.API, job *spec.BatchJob) (*kbatch.Job, InitContainers: []kcore.Container{ operator.InitContainer(api), }, - Containers: containers, - NodeSelector: operator.NodeSelectors(), - Tolerations: operator.GenerateResourceTolerations(), - Affinity: &kcore.Affinity{ - NodeAffinity: &kcore.NodeAffinity{ - PreferredDuringSchedulingIgnoredDuringExecution: operator.GeneratePreferredNodeAffinities(), - }, - }, + Containers: containers, + NodeSelector: operator.NodeSelectors(api.Compute.Selector), + Tolerations: operator.GenerateResourceTolerations(), + Affinity: affinity, Volumes: volumes, ServiceAccountName: operator.ServiceAccountName, }, diff --git a/pkg/operator/resources/job/taskapi/k8s_specs.go b/pkg/operator/resources/job/taskapi/k8s_specs.go index 4ad7215c20..86697c49eb 100644 --- a/pkg/operator/resources/job/taskapi/k8s_specs.go +++ b/pkg/operator/resources/job/taskapi/k8s_specs.go @@ -88,6 +88,15 @@ func k8sJobSpec(api *spec.API, job *spec.TaskJob) *kbatch.Job { } } + var affinity *kcore.Affinity + if api.Compute.Selector == nil { + affinity = &kcore.Affinity{ + NodeAffinity: &kcore.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: operator.GeneratePreferredNodeAffinities(), + }, + } + } + return k8s.Job(&k8s.JobSpec{ Name: job.JobKey.K8sName(), Parallelism: int32(job.Workers), @@ -117,14 +126,10 @@ func k8sJobSpec(api *spec.API, job *spec.TaskJob) *kbatch.Job { InitContainers: []kcore.Container{ operator.TaskInitContainer(api), }, - Containers: containers, - NodeSelector: operator.NodeSelectors(), - Tolerations: operator.GenerateResourceTolerations(), - Affinity: &kcore.Affinity{ - NodeAffinity: &kcore.NodeAffinity{ - PreferredDuringSchedulingIgnoredDuringExecution: operator.GeneratePreferredNodeAffinities(), - }, - }, + Containers: containers, + NodeSelector: operator.NodeSelectors(api.Compute.Selector), + Tolerations: operator.GenerateResourceTolerations(), + Affinity: affinity, Volumes: volumes, ServiceAccountName: operator.ServiceAccountName, }, diff --git a/pkg/operator/resources/realtimeapi/k8s_specs.go b/pkg/operator/resources/realtimeapi/k8s_specs.go index 7c174b0e12..cd67fc3755 100644 --- a/pkg/operator/resources/realtimeapi/k8s_specs.go +++ b/pkg/operator/resources/realtimeapi/k8s_specs.go @@ -49,6 +49,15 @@ func tensorflowAPISpec(api *spec.API, prevDeployment *kapps.Deployment) *kapps.D servingProtocol = "grpc" } + var affinity *kcore.Affinity + if api.Compute.Selector == nil { + affinity = &kcore.Affinity{ + NodeAffinity: &kcore.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: operator.GeneratePreferredNodeAffinities(), + }, + } + } + return k8s.Deployment(&k8s.DeploymentSpec{ Name: operator.K8sName(api.Name), Replicas: getRequestedReplicasFromDeployment(api, prevDeployment), @@ -87,14 +96,10 @@ func tensorflowAPISpec(api *spec.API, prevDeployment *kapps.Deployment) *kapps.D InitContainers: []kcore.Container{ operator.InitContainer(api), }, - Containers: containers, - NodeSelector: operator.NodeSelectors(), - Tolerations: operator.GenerateResourceTolerations(), - Affinity: &kcore.Affinity{ - NodeAffinity: &kcore.NodeAffinity{ - PreferredDuringSchedulingIgnoredDuringExecution: operator.GeneratePreferredNodeAffinities(), - }, - }, + Containers: containers, + NodeSelector: operator.NodeSelectors(api.Compute.Selector), + Tolerations: operator.GenerateResourceTolerations(), + Affinity: affinity, Volumes: volumes, ServiceAccountName: operator.ServiceAccountName, }, @@ -111,6 +116,15 @@ func pythonAPISpec(api *spec.API, prevDeployment *kapps.Deployment) *kapps.Deplo servingProtocol = "grpc" } + var affinity *kcore.Affinity + if api.Compute.Selector == nil { + affinity = &kcore.Affinity{ + NodeAffinity: &kcore.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: operator.GeneratePreferredNodeAffinities(), + }, + } + } + return k8s.Deployment(&k8s.DeploymentSpec{ Name: operator.K8sName(api.Name), Replicas: getRequestedReplicasFromDeployment(api, prevDeployment), @@ -149,14 +163,10 @@ func pythonAPISpec(api *spec.API, prevDeployment *kapps.Deployment) *kapps.Deplo InitContainers: []kcore.Container{ operator.InitContainer(api), }, - Containers: containers, - NodeSelector: operator.NodeSelectors(), - Tolerations: operator.GenerateResourceTolerations(), - Affinity: &kcore.Affinity{ - NodeAffinity: &kcore.NodeAffinity{ - PreferredDuringSchedulingIgnoredDuringExecution: operator.GeneratePreferredNodeAffinities(), - }, - }, + Containers: containers, + NodeSelector: operator.NodeSelectors(api.Compute.Selector), + Tolerations: operator.GenerateResourceTolerations(), + Affinity: affinity, Volumes: volumes, ServiceAccountName: operator.ServiceAccountName, }, diff --git a/pkg/operator/resources/validations.go b/pkg/operator/resources/validations.go index 914144486d..97d41a48a0 100644 --- a/pkg/operator/resources/validations.go +++ b/pkg/operator/resources/validations.go @@ -191,7 +191,17 @@ var _inferentiaMemReserve = kresource.MustParse("100Mi") func validateK8sCompute(compute *userconfig.Compute, maxMemMap map[string]kresource.Quantity) error { allErrors := []error{} successfulLoops := 0 + + nodeGroupNames := strset.New(config.ManagedConfig.GetNodeGroupNames()...) + if compute.Selector != nil && !nodeGroupNames.Has(*compute.Selector) { + return ErrorInvalidNodeGroupSelector(*compute.Selector, nodeGroupNames.Slice()) + } + for _, instanceMetadata := range config.InstancesMetadata { + if compute.Selector != nil && config.ManagedConfig.GetNodeGroupByName(*compute.Selector).InstanceType != instanceMetadata.Type { + continue + } + maxMemLoop := maxMemMap[instanceMetadata.Type] maxMemLoop.Sub(_cortexMemReserve) diff --git a/pkg/types/clusterconfig/cluster_config.go b/pkg/types/clusterconfig/cluster_config.go index b86e5fe474..4a8accad2d 100644 --- a/pkg/types/clusterconfig/cluster_config.go +++ b/pkg/types/clusterconfig/cluster_config.go @@ -1415,6 +1415,26 @@ func (mc *ManagedConfig) GetAllInstanceTypes() []string { return allInstanceTypes.Slice() } +func (mc *ManagedConfig) GetNodeGroupByName(name string) *NodeGroup { + for _, ng := range mc.NodeGroups { + if ng.Name == name { + matchedNodeGroup := *ng + return &matchedNodeGroup + } + } + + return nil +} + +func (mc *ManagedConfig) GetNodeGroupNames() []string { + allNodeGroupNames := []string{} + for _, ng := range mc.NodeGroups { + allNodeGroupNames = append(allNodeGroupNames, ng.Name) + } + + return allNodeGroupNames +} + func validateClusterName(clusterName string) (string, error) { if !_strictS3BucketRegex.MatchString(clusterName) { return "", errors.Wrap(ErrorDidNotMatchStrictS3Regex(), clusterName) diff --git a/pkg/types/spec/validations.go b/pkg/types/spec/validations.go index 7c7d83fcdf..a14e6a06ff 100644 --- a/pkg/types/spec/validations.go +++ b/pkg/types/spec/validations.go @@ -397,6 +397,14 @@ func computeValidation() *cr.StructFieldValidation { GreaterThanOrEqualTo: pointer.Int64(0), }, }, + { + StructField: "Selector", + StringPtrValidation: &cr.StringPtrValidation{ + Default: nil, + AllowExplicitNull: true, + AlphaNumericDashUnderscore: true, + }, + }, }, }, } diff --git a/pkg/types/userconfig/api.go b/pkg/types/userconfig/api.go index b3f959f25f..e22387b57b 100644 --- a/pkg/types/userconfig/api.go +++ b/pkg/types/userconfig/api.go @@ -111,10 +111,11 @@ type Networking struct { } type Compute struct { - CPU *k8s.Quantity `json:"cpu" yaml:"cpu"` - Mem *k8s.Quantity `json:"mem" yaml:"mem"` - GPU int64 `json:"gpu" yaml:"gpu"` - Inf int64 `json:"inf" yaml:"inf"` + CPU *k8s.Quantity `json:"cpu" yaml:"cpu"` + Mem *k8s.Quantity `json:"mem" yaml:"mem"` + GPU int64 `json:"gpu" yaml:"gpu"` + Inf int64 `json:"inf" yaml:"inf"` + Selector *string `json:"selector" yaml:"selector"` } type Autoscaling struct { @@ -540,6 +541,12 @@ func (compute *Compute) Normalized() string { } else { sb.WriteString(fmt.Sprintf("%s: %d\n", MemKey, compute.Mem.Value())) } + if compute.Selector == nil { + sb.WriteString(fmt.Sprintf("%s: null\n", SelectorKey)) + } else { + sb.WriteString(fmt.Sprintf("%s: %s\n", SelectorKey, *compute.Selector)) + } + return sb.String() } @@ -561,6 +568,11 @@ func (compute *Compute) UserStr() string { } else { sb.WriteString(fmt.Sprintf("%s: %s\n", MemKey, compute.Mem.UserString)) } + if compute.Selector == nil { + sb.WriteString(fmt.Sprintf("%s: null # automatic node-group selection\n", SelectorKey)) + } else { + sb.WriteString(fmt.Sprintf("%s: %s\n", SelectorKey, *compute.Selector)) + } return sb.String() } @@ -589,6 +601,14 @@ func (compute Compute) Equals(c2 *Compute) bool { return false } + if compute.Selector == nil && c2.Selector != nil || compute.Selector != nil && c2.Selector == nil { + return false + } + + if *compute.Selector != *c2.Selector { + return false + } + return true } @@ -655,6 +675,10 @@ func (api *API) TelemetryEvent() map[string]interface{} { } event["compute.gpu"] = api.Compute.GPU event["compute.inf"] = api.Compute.Inf + if api.Compute.Selector != nil { + event["compute.selector._is_defined"] = true + event["compute.selector"] = *api.Compute.Selector + } } if api.Predictor != nil { diff --git a/pkg/types/userconfig/config_key.go b/pkg/types/userconfig/config_key.go index 8a6d2a7912..7885b35527 100644 --- a/pkg/types/userconfig/config_key.go +++ b/pkg/types/userconfig/config_key.go @@ -76,10 +76,11 @@ const ( EndpointKey = "endpoint" // Compute - CPUKey = "cpu" - MemKey = "mem" - GPUKey = "gpu" - InfKey = "inf" + CPUKey = "cpu" + MemKey = "mem" + GPUKey = "gpu" + InfKey = "inf" + SelectorKey = "selector" // Autoscaling MinReplicasKey = "min_replicas" From 6b91d26af45fc01148e03115bad738c1c6777fd8 Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Fri, 16 Apr 2021 23:38:14 +0300 Subject: [PATCH 2/7] Add docs for the API node-selector --- docs/workloads/async/configuration.md | 1 + docs/workloads/batch/configuration.md | 1 + docs/workloads/realtime/configuration.md | 1 + docs/workloads/task/configuration.md | 1 + 4 files changed, 4 insertions(+) diff --git a/docs/workloads/async/configuration.md b/docs/workloads/async/configuration.md index cf9a28db08..11b3f8e19c 100644 --- a/docs/workloads/async/configuration.md +++ b/docs/workloads/async/configuration.md @@ -63,6 +63,7 @@ compute: cpu: # CPU request per replica. One unit of CPU corresponds to one virtual CPU; fractional requests are allowed, and can be specified as a floating point number or via the "m" suffix (default: 200m) gpu: # GPU request per replica. One unit of GPU corresponds to one virtual GPU (default: 0) mem: # memory request per replica. One unit of memory is one byte and can be expressed as an integer or by using one of these suffixes: K, M, G, T (or their power-of two counterparts: Ki, Mi, Gi, Ti) (default: Null) + selector: # to select a specific node group (optional) ``` ## Autoscaling diff --git a/docs/workloads/batch/configuration.md b/docs/workloads/batch/configuration.md index 7eff0f92b2..560313ac56 100644 --- a/docs/workloads/batch/configuration.md +++ b/docs/workloads/batch/configuration.md @@ -67,6 +67,7 @@ compute: cpu: # CPU request per worker. One unit of CPU corresponds to one virtual CPU; fractional requests are allowed, and can be specified as a floating point number or via the "m" suffix (default: 200m) gpu: # GPU request per worker. One unit of GPU corresponds to one virtual GPU (default: 0) mem: # memory request per worker. One unit of memory is one byte and can be expressed as an integer or by using one of these suffixes: K, M, G, T (or their power-of two counterparts: Ki, Mi, Gi, Ti) (default: Null) + selector: # to select a specific node group (optional) ``` ## Networking diff --git a/docs/workloads/realtime/configuration.md b/docs/workloads/realtime/configuration.md index 295261f6d1..03b7dca163 100644 --- a/docs/workloads/realtime/configuration.md +++ b/docs/workloads/realtime/configuration.md @@ -90,6 +90,7 @@ compute: cpu: # CPU request per replica. One unit of CPU corresponds to one virtual CPU; fractional requests are allowed, and can be specified as a floating point number or via the "m" suffix (default: 200m) gpu: # GPU request per replica. One unit of GPU corresponds to one virtual GPU (default: 0) mem: # memory request per replica. One unit of memory is one byte and can be expressed as an integer or by using one of these suffixes: K, M, G, T (or their power-of two counterparts: Ki, Mi, Gi, Ti) (default: Null) + selector: # to select a specific node group (optional) ``` ## Autoscaling diff --git a/docs/workloads/task/configuration.md b/docs/workloads/task/configuration.md index 98232c79a4..5583611895 100644 --- a/docs/workloads/task/configuration.md +++ b/docs/workloads/task/configuration.md @@ -22,4 +22,5 @@ gpu: # GPU request per worker. One unit of GPU corresponds to one virtual GPU (default: 0) inf: # Inferentia request per worker. One unit corresponds to one Inferentia ASIC with 4 NeuronCores and 8GB of cache memory. Each process will have one NeuronCore Group with (4 * inf / processes_per_replica) NeuronCores, so your model should be compiled to run on (4 * inf / processes_per_replica) NeuronCores. (default: 0) mem: # memory request per worker. One unit of memory is one byte and can be expressed as an integer or by using one of these suffixes: K, M, G, T (or their power-of two counterparts: Ki, Mi, Gi, Ti) (default: Null) + selector: # to select a specific node group (optional) ``` From 65addbf82cf30e4887ef2064580134b833b1a206 Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Sat, 17 Apr 2021 03:11:20 +0300 Subject: [PATCH 3/7] Add preferred/required node groups & address PR comments --- docs/workloads/async/configuration.md | 2 +- docs/workloads/batch/configuration.md | 2 +- docs/workloads/realtime/configuration.md | 2 +- docs/workloads/task/configuration.md | 2 +- pkg/lib/configreader/string_list.go | 37 +++++---- pkg/operator/operator/k8s.go | 76 ++++++++++++------- pkg/operator/resources/asyncapi/k8s_specs.go | 26 +------ .../resources/job/batchapi/k8s_specs.go | 26 +------ .../resources/job/taskapi/k8s_specs.go | 13 +--- .../resources/realtimeapi/k8s_specs.go | 26 +------ pkg/operator/resources/validations.go | 24 ++++-- pkg/types/spec/validations.go | 14 ++-- pkg/types/userconfig/api.go | 32 ++++---- pkg/types/userconfig/config_key.go | 10 +-- 14 files changed, 139 insertions(+), 153 deletions(-) diff --git a/docs/workloads/async/configuration.md b/docs/workloads/async/configuration.md index 11b3f8e19c..d248e7efe4 100644 --- a/docs/workloads/async/configuration.md +++ b/docs/workloads/async/configuration.md @@ -63,7 +63,7 @@ compute: cpu: # CPU request per replica. One unit of CPU corresponds to one virtual CPU; fractional requests are allowed, and can be specified as a floating point number or via the "m" suffix (default: 200m) gpu: # GPU request per replica. One unit of GPU corresponds to one virtual GPU (default: 0) mem: # memory request per replica. One unit of memory is one byte and can be expressed as an integer or by using one of these suffixes: K, M, G, T (or their power-of two counterparts: Ki, Mi, Gi, Ti) (default: Null) - selector: # to select a specific node group (optional) + node_groups: # to select a specific node group (optional) ``` ## Autoscaling diff --git a/docs/workloads/batch/configuration.md b/docs/workloads/batch/configuration.md index 560313ac56..79be5c8468 100644 --- a/docs/workloads/batch/configuration.md +++ b/docs/workloads/batch/configuration.md @@ -67,7 +67,7 @@ compute: cpu: # CPU request per worker. One unit of CPU corresponds to one virtual CPU; fractional requests are allowed, and can be specified as a floating point number or via the "m" suffix (default: 200m) gpu: # GPU request per worker. One unit of GPU corresponds to one virtual GPU (default: 0) mem: # memory request per worker. One unit of memory is one byte and can be expressed as an integer or by using one of these suffixes: K, M, G, T (or their power-of two counterparts: Ki, Mi, Gi, Ti) (default: Null) - selector: # to select a specific node group (optional) + node_groups: # to select a specific node group (optional) ``` ## Networking diff --git a/docs/workloads/realtime/configuration.md b/docs/workloads/realtime/configuration.md index 03b7dca163..67f9112b86 100644 --- a/docs/workloads/realtime/configuration.md +++ b/docs/workloads/realtime/configuration.md @@ -90,7 +90,7 @@ compute: cpu: # CPU request per replica. One unit of CPU corresponds to one virtual CPU; fractional requests are allowed, and can be specified as a floating point number or via the "m" suffix (default: 200m) gpu: # GPU request per replica. One unit of GPU corresponds to one virtual GPU (default: 0) mem: # memory request per replica. One unit of memory is one byte and can be expressed as an integer or by using one of these suffixes: K, M, G, T (or their power-of two counterparts: Ki, Mi, Gi, Ti) (default: Null) - selector: # to select a specific node group (optional) + node_groups: # to select a specific node group (optional) ``` ## Autoscaling diff --git a/docs/workloads/task/configuration.md b/docs/workloads/task/configuration.md index 5583611895..4c7e1deff8 100644 --- a/docs/workloads/task/configuration.md +++ b/docs/workloads/task/configuration.md @@ -22,5 +22,5 @@ gpu: # GPU request per worker. One unit of GPU corresponds to one virtual GPU (default: 0) inf: # Inferentia request per worker. One unit corresponds to one Inferentia ASIC with 4 NeuronCores and 8GB of cache memory. Each process will have one NeuronCore Group with (4 * inf / processes_per_replica) NeuronCores, so your model should be compiled to run on (4 * inf / processes_per_replica) NeuronCores. (default: 0) mem: # memory request per worker. One unit of memory is one byte and can be expressed as an integer or by using one of these suffixes: K, M, G, T (or their power-of two counterparts: Ki, Mi, Gi, Ti) (default: Null) - selector: # to select a specific node group (optional) + node_groups: # to select a specific node group (optional) ``` diff --git a/pkg/lib/configreader/string_list.go b/pkg/lib/configreader/string_list.go index 1712097aa4..b8cbe41da1 100644 --- a/pkg/lib/configreader/string_list.go +++ b/pkg/lib/configreader/string_list.go @@ -20,22 +20,24 @@ import ( "github.com/cortexlabs/cortex/pkg/lib/cast" "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/slices" + s "github.com/cortexlabs/cortex/pkg/lib/strings" ) type StringListValidation struct { - Required bool - Default []string - AllowExplicitNull bool - AllowEmpty bool - CantBeSpecifiedErrStr *string - CastSingleItem bool - DisallowDups bool - MinLength int - MaxLength int - InvalidLengths []int - AllowCortexResources bool - RequireCortexResources bool - Validator func([]string) ([]string, error) + Required bool + Default []string + AllowExplicitNull bool + AllowEmpty bool + CantBeSpecifiedErrStr *string + CastSingleItem bool + DisallowDups bool + MinLength int + MaxLength int + InvalidLengths []int + AllowCortexResources bool + RequireCortexResources bool + ElementStringValidation *StringValidation // Required, Default, AllowEmpty, TreatNullAsEmpty & Validator fields not applicable here + Validator func([]string) ([]string, error) } func StringList(inter interface{}, v *StringListValidation) ([]string, error) { @@ -129,6 +131,15 @@ func validateStringList(val []string, v *StringListValidation) ([]string, error) } } + if v.ElementStringValidation != nil { + for i, element := range val { + err := ValidateStringVal(element, v.ElementStringValidation) + if err != nil { + return nil, errors.Wrap(err, s.Index(i)) + } + } + } + if v.Validator != nil { return v.Validator(val) } diff --git a/pkg/operator/operator/k8s.go b/pkg/operator/operator/k8s.go index 470cca548d..a867f3c465 100644 --- a/pkg/operator/operator/k8s.go +++ b/pkg/operator/operator/k8s.go @@ -30,6 +30,7 @@ import ( s "github.com/cortexlabs/cortex/pkg/lib/strings" "github.com/cortexlabs/cortex/pkg/lib/urls" "github.com/cortexlabs/cortex/pkg/operator/config" + "github.com/cortexlabs/cortex/pkg/types/clusterconfig" "github.com/cortexlabs/cortex/pkg/types/spec" "github.com/cortexlabs/cortex/pkg/types/userconfig" kcore "k8s.io/api/core/v1" @@ -1240,29 +1241,10 @@ func defaultVolumeMounts() []kcore.VolumeMount { } } -func NodeSelectors(nodeGroupSelector *string) map[string]string { - nodeSelectors := map[string]string{ +func NodeSelectors() map[string]string { + return map[string]string{ "workload": "true", } - - if nodeGroupSelector == nil { - return nodeSelectors - } - - var nodeGroupPrefix string - for _, nodeGroup := range config.ManagedConfig.NodeGroups { - if nodeGroup.Name == *nodeGroupSelector { - if nodeGroup.Spot { - nodeGroupPrefix = "cx-ws-" - } else { - nodeGroupPrefix = "cx-wd-" - } - break - } - } - - nodeSelectors["alpha.eksctl.io/nodegroup-name"] = nodeGroupPrefix + *nodeGroupSelector - return nodeSelectors } func GenerateResourceTolerations() []kcore.Toleration { @@ -1289,18 +1271,35 @@ func GenerateResourceTolerations() []kcore.Toleration { return tolerations } -func GeneratePreferredNodeAffinities() []kcore.PreferredSchedulingTerm { - affinities := []kcore.PreferredSchedulingTerm{} +func GenerateNodeAffinities(apiNodeGroups []string) *kcore.Affinity { + preferredAffinities := []kcore.PreferredSchedulingTerm{} + + // node groups are ordered according to how the cluster config node groups are ordered + var nodeGroups []*clusterconfig.NodeGroup + for _, clusterNodeGroup := range config.ManagedConfig.NodeGroups { + for _, apiNodeGroupName := range apiNodeGroups { + if clusterNodeGroup.Name == apiNodeGroupName { + nodeGroups = append(nodeGroups, clusterNodeGroup) + } + } + } - numNodeGroups := len(config.ManagedConfig.NodeGroups) - for idx, nodeGroup := range config.ManagedConfig.NodeGroups { + numNodeGroups := len(apiNodeGroups) + requiredNodeGroups := []string{} + if apiNodeGroups == nil { + nodeGroups = config.ManagedConfig.NodeGroups + numNodeGroups = len(config.ManagedConfig.NodeGroups) + } + + for idx, nodeGroup := range nodeGroups { var nodeGroupPrefix string if nodeGroup.Spot { nodeGroupPrefix = "cx-ws-" } else { nodeGroupPrefix = "cx-wd-" } - affinities = append(affinities, kcore.PreferredSchedulingTerm{ + + preferredAffinities = append(preferredAffinities, kcore.PreferredSchedulingTerm{ Weight: int32(100 * (1 - float64(idx)/float64(numNodeGroups))), Preference: kcore.NodeSelectorTerm{ MatchExpressions: []kcore.NodeSelectorRequirement{ @@ -1312,9 +1311,32 @@ func GeneratePreferredNodeAffinities() []kcore.PreferredSchedulingTerm { }, }, }) + requiredNodeGroups = append(requiredNodeGroups, nodeGroupPrefix+nodeGroup.Name) } - return affinities + var requiredNodeSelector *kcore.NodeSelector + if apiNodeGroups != nil { + requiredNodeSelector = &kcore.NodeSelector{ + NodeSelectorTerms: []kcore.NodeSelectorTerm{ + { + MatchExpressions: []kcore.NodeSelectorRequirement{ + { + Key: "alpha.eksctl.io/nodegroup-name", + Operator: kcore.NodeSelectorOpIn, + Values: requiredNodeGroups, + }, + }, + }, + }, + } + } + + return &kcore.Affinity{ + NodeAffinity: &kcore.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: preferredAffinities, + RequiredDuringSchedulingIgnoredDuringExecution: requiredNodeSelector, + }, + } } func K8sName(apiName string) string { diff --git a/pkg/operator/resources/asyncapi/k8s_specs.go b/pkg/operator/resources/asyncapi/k8s_specs.go index 38d93b5f8b..ff791a01fc 100644 --- a/pkg/operator/resources/asyncapi/k8s_specs.go +++ b/pkg/operator/resources/asyncapi/k8s_specs.go @@ -37,15 +37,6 @@ var _gatewayHPATargetMemUtilization int32 = 80 // percentage func gatewayDeploymentSpec(api spec.API, prevDeployment *kapps.Deployment, queueURL string) kapps.Deployment { container := operator.AsyncGatewayContainers(api, queueURL) - var affinity *kcore.Affinity - if api.Compute.Selector == nil { - affinity = &kcore.Affinity{ - NodeAffinity: &kcore.NodeAffinity{ - PreferredDuringSchedulingIgnoredDuringExecution: operator.GeneratePreferredNodeAffinities(), - }, - } - } - return *k8s.Deployment(&k8s.DeploymentSpec{ Name: getGatewayK8sName(api.Name), Replicas: 1, @@ -79,9 +70,9 @@ func gatewayDeploymentSpec(api spec.API, prevDeployment *kapps.Deployment, queue RestartPolicy: "Always", TerminationGracePeriodSeconds: pointer.Int64(_terminationGracePeriodSeconds), Containers: []kcore.Container{container}, - NodeSelector: operator.NodeSelectors(api.Compute.Selector), + NodeSelector: operator.NodeSelectors(), Tolerations: operator.GenerateResourceTolerations(), - Affinity: affinity, + Affinity: operator.GenerateNodeAffinities(api.Compute.NodeGroups), ServiceAccountName: operator.ServiceAccountName, }, }, @@ -178,15 +169,6 @@ func apiDeploymentSpec(api spec.API, prevDeployment *kapps.Deployment, queueURL panic(fmt.Sprintf("invalid predictor type: %s", api.Predictor.Type)) } - var affinity *kcore.Affinity - if api.Compute.Selector == nil { - affinity = &kcore.Affinity{ - NodeAffinity: &kcore.NodeAffinity{ - PreferredDuringSchedulingIgnoredDuringExecution: operator.GeneratePreferredNodeAffinities(), - }, - } - } - return *k8s.Deployment(&k8s.DeploymentSpec{ Name: operator.K8sName(api.Name), Replicas: getRequestedReplicasFromDeployment(api, prevDeployment), @@ -224,9 +206,9 @@ func apiDeploymentSpec(api spec.API, prevDeployment *kapps.Deployment, queueURL operator.InitContainer(&api), }, Containers: containers, - NodeSelector: operator.NodeSelectors(api.Compute.Selector), + NodeSelector: operator.NodeSelectors(), Tolerations: operator.GenerateResourceTolerations(), - Affinity: affinity, + Affinity: operator.GenerateNodeAffinities(api.Compute.NodeGroups), Volumes: volumes, ServiceAccountName: operator.ServiceAccountName, }, diff --git a/pkg/operator/resources/job/batchapi/k8s_specs.go b/pkg/operator/resources/job/batchapi/k8s_specs.go index b8eee5505a..79ac6b708c 100644 --- a/pkg/operator/resources/job/batchapi/k8s_specs.go +++ b/pkg/operator/resources/job/batchapi/k8s_specs.go @@ -57,15 +57,6 @@ func pythonPredictorJobSpec(api *spec.API, job *spec.BatchJob) (*kbatch.Job, err } } - var affinity *kcore.Affinity - if api.Compute.Selector == nil { - affinity = &kcore.Affinity{ - NodeAffinity: &kcore.NodeAffinity{ - PreferredDuringSchedulingIgnoredDuringExecution: operator.GeneratePreferredNodeAffinities(), - }, - } - } - return k8s.Job(&k8s.JobSpec{ Name: job.JobKey.K8sName(), Parallelism: int32(job.Workers), @@ -96,9 +87,9 @@ func pythonPredictorJobSpec(api *spec.API, job *spec.BatchJob) (*kbatch.Job, err operator.InitContainer(api), }, Containers: containers, - NodeSelector: operator.NodeSelectors(api.Compute.Selector), + NodeSelector: operator.NodeSelectors(), Tolerations: operator.GenerateResourceTolerations(), - Affinity: affinity, + Affinity: operator.GenerateNodeAffinities(api.Compute.NodeGroups), Volumes: volumes, ServiceAccountName: operator.ServiceAccountName, }, @@ -117,15 +108,6 @@ func tensorFlowPredictorJobSpec(api *spec.API, job *spec.BatchJob) (*kbatch.Job, } } - var affinity *kcore.Affinity - if api.Compute.Selector == nil { - affinity = &kcore.Affinity{ - NodeAffinity: &kcore.NodeAffinity{ - PreferredDuringSchedulingIgnoredDuringExecution: operator.GeneratePreferredNodeAffinities(), - }, - } - } - return k8s.Job(&k8s.JobSpec{ Name: job.JobKey.K8sName(), Parallelism: int32(job.Workers), @@ -156,9 +138,9 @@ func tensorFlowPredictorJobSpec(api *spec.API, job *spec.BatchJob) (*kbatch.Job, operator.InitContainer(api), }, Containers: containers, - NodeSelector: operator.NodeSelectors(api.Compute.Selector), + NodeSelector: operator.NodeSelectors(), Tolerations: operator.GenerateResourceTolerations(), - Affinity: affinity, + Affinity: operator.GenerateNodeAffinities(api.Compute.NodeGroups), Volumes: volumes, ServiceAccountName: operator.ServiceAccountName, }, diff --git a/pkg/operator/resources/job/taskapi/k8s_specs.go b/pkg/operator/resources/job/taskapi/k8s_specs.go index 86697c49eb..3264058e50 100644 --- a/pkg/operator/resources/job/taskapi/k8s_specs.go +++ b/pkg/operator/resources/job/taskapi/k8s_specs.go @@ -88,15 +88,6 @@ func k8sJobSpec(api *spec.API, job *spec.TaskJob) *kbatch.Job { } } - var affinity *kcore.Affinity - if api.Compute.Selector == nil { - affinity = &kcore.Affinity{ - NodeAffinity: &kcore.NodeAffinity{ - PreferredDuringSchedulingIgnoredDuringExecution: operator.GeneratePreferredNodeAffinities(), - }, - } - } - return k8s.Job(&k8s.JobSpec{ Name: job.JobKey.K8sName(), Parallelism: int32(job.Workers), @@ -127,9 +118,9 @@ func k8sJobSpec(api *spec.API, job *spec.TaskJob) *kbatch.Job { operator.TaskInitContainer(api), }, Containers: containers, - NodeSelector: operator.NodeSelectors(api.Compute.Selector), + NodeSelector: operator.NodeSelectors(), Tolerations: operator.GenerateResourceTolerations(), - Affinity: affinity, + Affinity: operator.GenerateNodeAffinities(api.Compute.NodeGroups), Volumes: volumes, ServiceAccountName: operator.ServiceAccountName, }, diff --git a/pkg/operator/resources/realtimeapi/k8s_specs.go b/pkg/operator/resources/realtimeapi/k8s_specs.go index cd67fc3755..728c71243d 100644 --- a/pkg/operator/resources/realtimeapi/k8s_specs.go +++ b/pkg/operator/resources/realtimeapi/k8s_specs.go @@ -49,15 +49,6 @@ func tensorflowAPISpec(api *spec.API, prevDeployment *kapps.Deployment) *kapps.D servingProtocol = "grpc" } - var affinity *kcore.Affinity - if api.Compute.Selector == nil { - affinity = &kcore.Affinity{ - NodeAffinity: &kcore.NodeAffinity{ - PreferredDuringSchedulingIgnoredDuringExecution: operator.GeneratePreferredNodeAffinities(), - }, - } - } - return k8s.Deployment(&k8s.DeploymentSpec{ Name: operator.K8sName(api.Name), Replicas: getRequestedReplicasFromDeployment(api, prevDeployment), @@ -97,9 +88,9 @@ func tensorflowAPISpec(api *spec.API, prevDeployment *kapps.Deployment) *kapps.D operator.InitContainer(api), }, Containers: containers, - NodeSelector: operator.NodeSelectors(api.Compute.Selector), + NodeSelector: operator.NodeSelectors(), Tolerations: operator.GenerateResourceTolerations(), - Affinity: affinity, + Affinity: operator.GenerateNodeAffinities(api.Compute.NodeGroups), Volumes: volumes, ServiceAccountName: operator.ServiceAccountName, }, @@ -116,15 +107,6 @@ func pythonAPISpec(api *spec.API, prevDeployment *kapps.Deployment) *kapps.Deplo servingProtocol = "grpc" } - var affinity *kcore.Affinity - if api.Compute.Selector == nil { - affinity = &kcore.Affinity{ - NodeAffinity: &kcore.NodeAffinity{ - PreferredDuringSchedulingIgnoredDuringExecution: operator.GeneratePreferredNodeAffinities(), - }, - } - } - return k8s.Deployment(&k8s.DeploymentSpec{ Name: operator.K8sName(api.Name), Replicas: getRequestedReplicasFromDeployment(api, prevDeployment), @@ -164,9 +146,9 @@ func pythonAPISpec(api *spec.API, prevDeployment *kapps.Deployment) *kapps.Deplo operator.InitContainer(api), }, Containers: containers, - NodeSelector: operator.NodeSelectors(api.Compute.Selector), + NodeSelector: operator.NodeSelectors(), Tolerations: operator.GenerateResourceTolerations(), - Affinity: affinity, + Affinity: operator.GenerateNodeAffinities(api.Compute.NodeGroups), Volumes: volumes, ServiceAccountName: operator.ServiceAccountName, }, diff --git a/pkg/operator/resources/validations.go b/pkg/operator/resources/validations.go index 97d41a48a0..d8a3050f4c 100644 --- a/pkg/operator/resources/validations.go +++ b/pkg/operator/resources/validations.go @@ -192,14 +192,28 @@ func validateK8sCompute(compute *userconfig.Compute, maxMemMap map[string]kresou allErrors := []error{} successfulLoops := 0 - nodeGroupNames := strset.New(config.ManagedConfig.GetNodeGroupNames()...) - if compute.Selector != nil && !nodeGroupNames.Has(*compute.Selector) { - return ErrorInvalidNodeGroupSelector(*compute.Selector, nodeGroupNames.Slice()) + clusterNodeGroupNames := strset.New(config.ManagedConfig.GetNodeGroupNames()...) + apiNodeGroupNames := compute.NodeGroups + + if apiNodeGroupNames != nil { + for _, ngName := range apiNodeGroupNames { + if !clusterNodeGroupNames.Has(ngName) { + return ErrorInvalidNodeGroupSelector(ngName, clusterNodeGroupNames.Slice()) + } + } } for _, instanceMetadata := range config.InstancesMetadata { - if compute.Selector != nil && config.ManagedConfig.GetNodeGroupByName(*compute.Selector).InstanceType != instanceMetadata.Type { - continue + if apiNodeGroupNames != nil { + matchedNodeGroups := 0 + for _, ngName := range apiNodeGroupNames { + if config.ManagedConfig.GetNodeGroupByName(ngName).InstanceType != instanceMetadata.Type { + matchedNodeGroups++ + } + } + if matchedNodeGroups == 0 { + continue + } } maxMemLoop := maxMemMap[instanceMetadata.Type] diff --git a/pkg/types/spec/validations.go b/pkg/types/spec/validations.go index a14e6a06ff..2a60205ad0 100644 --- a/pkg/types/spec/validations.go +++ b/pkg/types/spec/validations.go @@ -398,11 +398,15 @@ func computeValidation() *cr.StructFieldValidation { }, }, { - StructField: "Selector", - StringPtrValidation: &cr.StringPtrValidation{ - Default: nil, - AllowExplicitNull: true, - AlphaNumericDashUnderscore: true, + StructField: "NodeGroups", + StringListValidation: &cr.StringListValidation{ + Required: false, + Default: nil, + AllowExplicitNull: true, + AllowEmpty: false, + ElementStringValidation: &cr.StringValidation{ + AlphaNumericDashUnderscore: true, + }, }, }, }, diff --git a/pkg/types/userconfig/api.go b/pkg/types/userconfig/api.go index e22387b57b..95f8b09ff6 100644 --- a/pkg/types/userconfig/api.go +++ b/pkg/types/userconfig/api.go @@ -23,6 +23,7 @@ import ( "github.com/cortexlabs/cortex/pkg/consts" "github.com/cortexlabs/cortex/pkg/lib/k8s" + "github.com/cortexlabs/cortex/pkg/lib/sets/strset" s "github.com/cortexlabs/cortex/pkg/lib/strings" "github.com/cortexlabs/cortex/pkg/lib/urls" "github.com/cortexlabs/yaml" @@ -111,11 +112,11 @@ type Networking struct { } type Compute struct { - CPU *k8s.Quantity `json:"cpu" yaml:"cpu"` - Mem *k8s.Quantity `json:"mem" yaml:"mem"` - GPU int64 `json:"gpu" yaml:"gpu"` - Inf int64 `json:"inf" yaml:"inf"` - Selector *string `json:"selector" yaml:"selector"` + CPU *k8s.Quantity `json:"cpu" yaml:"cpu"` + Mem *k8s.Quantity `json:"mem" yaml:"mem"` + GPU int64 `json:"gpu" yaml:"gpu"` + Inf int64 `json:"inf" yaml:"inf"` + NodeGroups []string `json:"node_groups" yaml:"node_groups"` } type Autoscaling struct { @@ -541,10 +542,10 @@ func (compute *Compute) Normalized() string { } else { sb.WriteString(fmt.Sprintf("%s: %d\n", MemKey, compute.Mem.Value())) } - if compute.Selector == nil { - sb.WriteString(fmt.Sprintf("%s: null\n", SelectorKey)) + if compute.NodeGroups == nil { + sb.WriteString(fmt.Sprintf("%s: null\n", NodeGroupsKey)) } else { - sb.WriteString(fmt.Sprintf("%s: %s\n", SelectorKey, *compute.Selector)) + sb.WriteString(fmt.Sprintf("%s: %s\n", NodeGroupsKey, s.ObjFlatNoQuotes(compute.NodeGroups))) } return sb.String() @@ -568,10 +569,10 @@ func (compute *Compute) UserStr() string { } else { sb.WriteString(fmt.Sprintf("%s: %s\n", MemKey, compute.Mem.UserString)) } - if compute.Selector == nil { - sb.WriteString(fmt.Sprintf("%s: null # automatic node-group selection\n", SelectorKey)) + if compute.NodeGroups == nil { + sb.WriteString(fmt.Sprintf("%s: null # automatic node-group selection\n", NodeGroupsKey)) } else { - sb.WriteString(fmt.Sprintf("%s: %s\n", SelectorKey, *compute.Selector)) + sb.WriteString(fmt.Sprintf("%s: %s\n", NodeGroupsKey, s.ObjFlatNoQuotes(compute.NodeGroups))) } return sb.String() } @@ -601,11 +602,11 @@ func (compute Compute) Equals(c2 *Compute) bool { return false } - if compute.Selector == nil && c2.Selector != nil || compute.Selector != nil && c2.Selector == nil { + if compute.NodeGroups == nil && c2.NodeGroups != nil || compute.NodeGroups != nil && c2.NodeGroups == nil { return false } - if *compute.Selector != *c2.Selector { + if !strset.New(compute.NodeGroups...).IsEqual(strset.New(c2.NodeGroups...)) { return false } @@ -675,10 +676,7 @@ func (api *API) TelemetryEvent() map[string]interface{} { } event["compute.gpu"] = api.Compute.GPU event["compute.inf"] = api.Compute.Inf - if api.Compute.Selector != nil { - event["compute.selector._is_defined"] = true - event["compute.selector"] = *api.Compute.Selector - } + event["compute.node_groups._is_defined"] = len(api.Compute.NodeGroups) > 0 } if api.Predictor != nil { diff --git a/pkg/types/userconfig/config_key.go b/pkg/types/userconfig/config_key.go index 7885b35527..faa9c8c6e5 100644 --- a/pkg/types/userconfig/config_key.go +++ b/pkg/types/userconfig/config_key.go @@ -76,11 +76,11 @@ const ( EndpointKey = "endpoint" // Compute - CPUKey = "cpu" - MemKey = "mem" - GPUKey = "gpu" - InfKey = "inf" - SelectorKey = "selector" + CPUKey = "cpu" + MemKey = "mem" + GPUKey = "gpu" + InfKey = "inf" + NodeGroupsKey = "node_groups" // Autoscaling MinReplicasKey = "min_replicas" From fdcaf12add99d7122b3f8f8c96285204bbb9ee66 Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Sat, 17 Apr 2021 05:39:00 +0300 Subject: [PATCH 4/7] Fixes --- pkg/operator/resources/validations.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/operator/resources/validations.go b/pkg/operator/resources/validations.go index d8a3050f4c..f58aa9b15c 100644 --- a/pkg/operator/resources/validations.go +++ b/pkg/operator/resources/validations.go @@ -198,7 +198,7 @@ func validateK8sCompute(compute *userconfig.Compute, maxMemMap map[string]kresou if apiNodeGroupNames != nil { for _, ngName := range apiNodeGroupNames { if !clusterNodeGroupNames.Has(ngName) { - return ErrorInvalidNodeGroupSelector(ngName, clusterNodeGroupNames.Slice()) + return ErrorInvalidNodeGroupSelector(ngName, config.ManagedConfig.GetNodeGroupNames()) } } } @@ -207,7 +207,7 @@ func validateK8sCompute(compute *userconfig.Compute, maxMemMap map[string]kresou if apiNodeGroupNames != nil { matchedNodeGroups := 0 for _, ngName := range apiNodeGroupNames { - if config.ManagedConfig.GetNodeGroupByName(ngName).InstanceType != instanceMetadata.Type { + if config.ManagedConfig.GetNodeGroupByName(ngName).InstanceType == instanceMetadata.Type { matchedNodeGroups++ } } From c32ddd3e2b5847a1c6dd6ae4dc192764704de348 Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Sat, 17 Apr 2021 05:41:51 +0300 Subject: [PATCH 5/7] Update docs --- docs/workloads/async/configuration.md | 2 +- docs/workloads/batch/configuration.md | 2 +- docs/workloads/realtime/configuration.md | 2 +- docs/workloads/task/configuration.md | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/workloads/async/configuration.md b/docs/workloads/async/configuration.md index d248e7efe4..e69e9243e2 100644 --- a/docs/workloads/async/configuration.md +++ b/docs/workloads/async/configuration.md @@ -63,7 +63,7 @@ compute: cpu: # CPU request per replica. One unit of CPU corresponds to one virtual CPU; fractional requests are allowed, and can be specified as a floating point number or via the "m" suffix (default: 200m) gpu: # GPU request per replica. One unit of GPU corresponds to one virtual GPU (default: 0) mem: # memory request per replica. One unit of memory is one byte and can be expressed as an integer or by using one of these suffixes: K, M, G, T (or their power-of two counterparts: Ki, Mi, Gi, Ti) (default: Null) - node_groups: # to select a specific node group (optional) + node_groups: # to select specific node groups (optional) ``` ## Autoscaling diff --git a/docs/workloads/batch/configuration.md b/docs/workloads/batch/configuration.md index 79be5c8468..1363f0b508 100644 --- a/docs/workloads/batch/configuration.md +++ b/docs/workloads/batch/configuration.md @@ -67,7 +67,7 @@ compute: cpu: # CPU request per worker. One unit of CPU corresponds to one virtual CPU; fractional requests are allowed, and can be specified as a floating point number or via the "m" suffix (default: 200m) gpu: # GPU request per worker. One unit of GPU corresponds to one virtual GPU (default: 0) mem: # memory request per worker. One unit of memory is one byte and can be expressed as an integer or by using one of these suffixes: K, M, G, T (or their power-of two counterparts: Ki, Mi, Gi, Ti) (default: Null) - node_groups: # to select a specific node group (optional) + node_groups: # to select specific node groups (optional) ``` ## Networking diff --git a/docs/workloads/realtime/configuration.md b/docs/workloads/realtime/configuration.md index 67f9112b86..73a6cb2142 100644 --- a/docs/workloads/realtime/configuration.md +++ b/docs/workloads/realtime/configuration.md @@ -90,7 +90,7 @@ compute: cpu: # CPU request per replica. One unit of CPU corresponds to one virtual CPU; fractional requests are allowed, and can be specified as a floating point number or via the "m" suffix (default: 200m) gpu: # GPU request per replica. One unit of GPU corresponds to one virtual GPU (default: 0) mem: # memory request per replica. One unit of memory is one byte and can be expressed as an integer or by using one of these suffixes: K, M, G, T (or their power-of two counterparts: Ki, Mi, Gi, Ti) (default: Null) - node_groups: # to select a specific node group (optional) + node_groups: # to select specific node groups (optional) ``` ## Autoscaling diff --git a/docs/workloads/task/configuration.md b/docs/workloads/task/configuration.md index 4c7e1deff8..8eff822b3d 100644 --- a/docs/workloads/task/configuration.md +++ b/docs/workloads/task/configuration.md @@ -22,5 +22,5 @@ gpu: # GPU request per worker. One unit of GPU corresponds to one virtual GPU (default: 0) inf: # Inferentia request per worker. One unit corresponds to one Inferentia ASIC with 4 NeuronCores and 8GB of cache memory. Each process will have one NeuronCore Group with (4 * inf / processes_per_replica) NeuronCores, so your model should be compiled to run on (4 * inf / processes_per_replica) NeuronCores. (default: 0) mem: # memory request per worker. One unit of memory is one byte and can be expressed as an integer or by using one of these suffixes: K, M, G, T (or their power-of two counterparts: Ki, Mi, Gi, Ti) (default: Null) - node_groups: # to select a specific node group (optional) + node_groups: # to select specific node groups (optional) ``` From 6b84d894e647c36d0bdeeaeb2ab809b4affabfb7 Mon Sep 17 00:00:00 2001 From: David Eliahu Date: Fri, 16 Apr 2021 21:06:39 -0700 Subject: [PATCH 6/7] Update api.go --- pkg/types/userconfig/api.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/types/userconfig/api.go b/pkg/types/userconfig/api.go index 95f8b09ff6..adfb2ad1c3 100644 --- a/pkg/types/userconfig/api.go +++ b/pkg/types/userconfig/api.go @@ -677,6 +677,7 @@ func (api *API) TelemetryEvent() map[string]interface{} { event["compute.gpu"] = api.Compute.GPU event["compute.inf"] = api.Compute.Inf event["compute.node_groups._is_defined"] = len(api.Compute.NodeGroups) > 0 + event["compute.node_groups._len"] = len(api.Compute.NodeGroups) } if api.Predictor != nil { From 6dfcf65a35bc9acffed91d02c001b7f3b83949cc Mon Sep 17 00:00:00 2001 From: David Eliahu Date: Fri, 16 Apr 2021 21:10:07 -0700 Subject: [PATCH 7/7] Update k8s.go --- pkg/operator/operator/k8s.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/operator/operator/k8s.go b/pkg/operator/operator/k8s.go index a867f3c465..174f77627a 100644 --- a/pkg/operator/operator/k8s.go +++ b/pkg/operator/operator/k8s.go @@ -1272,8 +1272,6 @@ func GenerateResourceTolerations() []kcore.Toleration { } func GenerateNodeAffinities(apiNodeGroups []string) *kcore.Affinity { - preferredAffinities := []kcore.PreferredSchedulingTerm{} - // node groups are ordered according to how the cluster config node groups are ordered var nodeGroups []*clusterconfig.NodeGroup for _, clusterNodeGroup := range config.ManagedConfig.NodeGroups { @@ -1285,12 +1283,14 @@ func GenerateNodeAffinities(apiNodeGroups []string) *kcore.Affinity { } numNodeGroups := len(apiNodeGroups) - requiredNodeGroups := []string{} if apiNodeGroups == nil { nodeGroups = config.ManagedConfig.NodeGroups numNodeGroups = len(config.ManagedConfig.NodeGroups) } + requiredNodeGroups := []string{} + preferredAffinities := []kcore.PreferredSchedulingTerm{} + for idx, nodeGroup := range nodeGroups { var nodeGroupPrefix string if nodeGroup.Spot {