From 0c6b94bd28723ad194ed08b6bc35c09d6ad05701 Mon Sep 17 00:00:00 2001 From: vishal Date: Mon, 4 Jan 2021 14:32:16 -0500 Subject: [PATCH 1/3] Allow users to configure shared memory for IPC --- docs/workloads/batch/configuration.md | 3 + docs/workloads/realtime/configuration.md | 3 + pkg/operator/operator/k8s.go | 57 +++++++++++++++++-- pkg/operator/resources/batchapi/k8s_specs.go | 4 +- .../resources/realtimeapi/k8s_specs.go | 4 +- pkg/types/spec/errors.go | 11 ++++ pkg/types/spec/validations.go | 16 ++++++ pkg/types/userconfig/api.go | 10 ++++ pkg/types/userconfig/config_key.go | 1 + 9 files changed, 101 insertions(+), 8 deletions(-) diff --git a/docs/workloads/batch/configuration.md b/docs/workloads/batch/configuration.md index 0a7653b090..02f3b66129 100644 --- a/docs/workloads/batch/configuration.md +++ b/docs/workloads/batch/configuration.md @@ -14,6 +14,7 @@ image: # docker image to use for the Predictor (default: quay.io/cortexlabs/python-predictor-cpu:master or quay.io/cortexlabs/python-predictor-gpu:master based on compute) env: # dictionary of environment variables log_level: # log level that can be "debug", "info", "warning" or "error" (default: "info") + shm_size: # size of shared memory (/dev/shm) for sharing data between multiple processes, e.g. 64Mi or 1Gi (default: Null) networking: endpoint: # the endpoint for the API (default: ) api_gateway: public | none # whether to create a public API Gateway endpoint for this API (if not, the API will still be accessible via the load balancer) (default: public, unless disabled cluster-wide) @@ -50,6 +51,7 @@ tensorflow_serving_image: # docker image to use for the TensorFlow Serving container (default: quay.io/cortexlabs/tensorflow-serving-gpu:master or quay.io/cortexlabs/tensorflow-serving-cpu:master based on compute) env: # dictionary of environment variables log_level: # log level that can be "debug", "info", "warning" or "error" (default: "info") + shm_size: # size of shared memory (/dev/shm) for sharing data between multiple processes, e.g. 64Mi or 1Gi (default: Null) networking: endpoint: # the endpoint for the API (default: ) api_gateway: public | none # whether to create a public API Gateway endpoint for this API (if not, the API will still be accessible via the load balancer) (default: public, unless disabled cluster-wide) @@ -80,6 +82,7 @@ image: # docker image to use for the Predictor (default: quay.io/cortexlabs/onnx-predictor-gpu:master or quay.io/cortexlabs/onnx-predictor-cpu:master based on compute) env: # dictionary of environment variables log_level: # log level that can be "debug", "info", "warning" or "error" (default: "info") + shm_size: # size of shared memory (/dev/shm) for sharing data between multiple processes, e.g. 64Mi or 1Gi (default: Null) networking: endpoint: # the endpoint for the API (default: ) api_gateway: public | none # whether to create a public API Gateway endpoint for this API (if not, the API will still be accessible via the load balancer) (default: public, unless disabled cluster-wide) diff --git a/docs/workloads/realtime/configuration.md b/docs/workloads/realtime/configuration.md index bf18e9f634..5cc83b4814 100644 --- a/docs/workloads/realtime/configuration.md +++ b/docs/workloads/realtime/configuration.md @@ -28,6 +28,7 @@ image: # docker image to use for the Predictor (default: quay.io/cortexlabs/python-predictor-cpu:master or quay.io/cortexlabs/python-predictor-gpu:master based on compute) env: # dictionary of environment variables log_level: # log level that can be "debug", "info", "warning" or "error" (default: "info") + shm_size: # size of shared memory (/dev/shm) for sharing data between multiple processes, e.g. 64Mi or 1Gi (default: Null) networking: endpoint: # the endpoint for the API (default: ) api_gateway: public | none # whether to create a public API Gateway endpoint for this API (if not, the API will still be accessible via the load balancer) (default: public, unless disabled cluster-wide) (aws only) @@ -85,6 +86,7 @@ tensorflow_serving_image: # docker image to use for the TensorFlow Serving container (default: quay.io/cortexlabs/tensorflow-serving-gpu:master or quay.io/cortexlabs/tensorflow-serving-cpu:master based on compute) env: # dictionary of environment variables log_level: # log level that can be "debug", "info", "warning" or "error" (default: "info") + shm_size: # size of shared memory (/dev/shm) for sharing data between multiple processes, e.g. 64Mi or 1Gi (default: Null) networking: endpoint: # the endpoint for the API (default: ) api_gateway: public | none # whether to create a public API Gateway endpoint for this API (if not, the API will still be accessible via the load balancer) (default: public, unless disabled cluster-wide) (aws only) @@ -136,6 +138,7 @@ image: # docker image to use for the Predictor (default: quay.io/cortexlabs/onnx-predictor-gpu:master or quay.io/cortexlabs/onnx-predictor-cpu:master based on compute) env: # dictionary of environment variables log_level: # log level that can be "debug", "info", "warning" or "error" (default: "info") + shm_size: # size of shared memory (/dev/shm) for sharing data between multiple processes, e.g. 64Mi or 1Gi (default: Null) networking: endpoint: # the endpoint for the API (default: ) api_gateway: public | none # whether to create a public API Gateway endpoint for this API (if not, the API will still be accessible via the load balancer) (default: public, unless disabled cluster-wide) (aws only) diff --git a/pkg/operator/operator/k8s.go b/pkg/operator/operator/k8s.go index d0844d9537..6668ed7b49 100644 --- a/pkg/operator/operator/k8s.go +++ b/pkg/operator/operator/k8s.go @@ -140,7 +140,6 @@ func PythonPredictorContainers(api *spec.API) ([]kcore.Container, []kcore.Volume apiPodResourceList["nvidia.com/gpu"] = *kresource.NewQuantity(api.Compute.GPU, kresource.DecimalSI) apiPodResourceLimitsList["nvidia.com/gpu"] = *kresource.NewQuantity(api.Compute.GPU, kresource.DecimalSI) } - } else { volumes = append(volumes, kcore.Volume{ Name: "neuron-sock", @@ -173,6 +172,22 @@ func PythonPredictorContainers(api *spec.API) ([]kcore.Container, []kcore.Volume containers = append(containers, neuronContainer) } + if api.Predictor.ShmSize != nil { + volumes = append(volumes, kcore.Volume{ + Name: "dshm", + VolumeSource: kcore.VolumeSource{ + EmptyDir: &kcore.EmptyDirVolumeSource{ + Medium: kcore.StorageMediumMemory, + SizeLimit: k8s.QuantityPtr(api.Predictor.ShmSize.Quantity), + }, + }, + }) + apiPodVolumeMounts = append(apiPodVolumeMounts, kcore.VolumeMount{ + Name: "dshm", + MountPath: "/dev/shm", + }) + } + containers = append(containers, kcore.Container{ Name: APIContainerName, Image: api.Predictor.Image, @@ -262,6 +277,22 @@ func TensorFlowPredictorContainers(api *spec.API) ([]kcore.Container, []kcore.Vo containers = append(containers, neuronContainer) } + if api.Predictor.ShmSize != nil { + volumes = append(volumes, kcore.Volume{ + Name: "dshm", + VolumeSource: kcore.VolumeSource{ + EmptyDir: &kcore.EmptyDirVolumeSource{ + Medium: kcore.StorageMediumMemory, + SizeLimit: k8s.QuantityPtr(api.Predictor.ShmSize.Quantity), + }, + }, + }) + volumeMounts = append(volumeMounts, kcore.VolumeMount{ + Name: "dshm", + MountPath: "/dev/shm", + }) + } + containers = append(containers, kcore.Container{ Name: APIContainerName, Image: api.Predictor.Image, @@ -294,10 +325,12 @@ func TensorFlowPredictorContainers(api *spec.API) ([]kcore.Container, []kcore.Vo return containers, volumes } -func ONNXPredictorContainers(api *spec.API) []kcore.Container { +func ONNXPredictorContainers(api *spec.API) ([]kcore.Container, []kcore.Volume) { resourceList := kcore.ResourceList{} resourceLimitsList := kcore.ResourceList{} containers := []kcore.Container{} + apiPodVolumeMounts := defaultVolumeMounts() + volumes := DefaultVolumes() if api.Compute.CPU != nil { userPodCPURequest := k8s.QuantityPtr(api.Compute.CPU.Quantity.DeepCopy()) @@ -316,13 +349,29 @@ func ONNXPredictorContainers(api *spec.API) []kcore.Container { resourceLimitsList["nvidia.com/gpu"] = *kresource.NewQuantity(api.Compute.GPU, kresource.DecimalSI) } + if api.Predictor.ShmSize != nil { + volumes = append(volumes, kcore.Volume{ + Name: "dshm", + VolumeSource: kcore.VolumeSource{ + EmptyDir: &kcore.EmptyDirVolumeSource{ + Medium: kcore.StorageMediumMemory, + SizeLimit: k8s.QuantityPtr(api.Predictor.ShmSize.Quantity), + }, + }, + }) + apiPodVolumeMounts = append(apiPodVolumeMounts, kcore.VolumeMount{ + Name: "dshm", + MountPath: "/dev/shm", + }) + } + containers = append(containers, kcore.Container{ Name: APIContainerName, Image: api.Predictor.Image, ImagePullPolicy: kcore.PullAlways, Env: getEnvVars(api, APIContainerName), EnvFrom: baseEnvVars(), - VolumeMounts: defaultVolumeMounts(), + VolumeMounts: apiPodVolumeMounts, ReadinessProbe: FileExistsProbe(_apiReadinessFile), LivenessProbe: _apiLivenessProbe, Lifecycle: nginxGracefulStopper(api.Kind), @@ -338,7 +387,7 @@ func ONNXPredictorContainers(api *spec.API) []kcore.Container { }, }) - return containers + return containers, volumes } func getEnvVars(api *spec.API, container string) []kcore.EnvVar { diff --git a/pkg/operator/resources/batchapi/k8s_specs.go b/pkg/operator/resources/batchapi/k8s_specs.go index a1fbd58d95..c2c615a445 100644 --- a/pkg/operator/resources/batchapi/k8s_specs.go +++ b/pkg/operator/resources/batchapi/k8s_specs.go @@ -144,7 +144,7 @@ func tensorFlowPredictorJobSpec(api *spec.API, job *spec.Job) (*kbatch.Job, erro } func onnxPredictorJobSpec(api *spec.API, job *spec.Job) (*kbatch.Job, error) { - containers := operator.ONNXPredictorContainers(api) + containers, volumes := operator.ONNXPredictorContainers(api) for i, container := range containers { if container.Name == operator.APIContainerName { @@ -186,7 +186,7 @@ func onnxPredictorJobSpec(api *spec.API, job *spec.Job) (*kbatch.Job, error) { "workload": "true", }, Tolerations: operator.Tolerations, - Volumes: operator.DefaultVolumes(), + Volumes: volumes, ServiceAccountName: "default", }, }, diff --git a/pkg/operator/resources/realtimeapi/k8s_specs.go b/pkg/operator/resources/realtimeapi/k8s_specs.go index 300c1153e0..5667703dc2 100644 --- a/pkg/operator/resources/realtimeapi/k8s_specs.go +++ b/pkg/operator/resources/realtimeapi/k8s_specs.go @@ -152,7 +152,7 @@ func pythonAPISpec(api *spec.API, prevDeployment *kapps.Deployment) *kapps.Deplo } func onnxAPISpec(api *spec.API, prevDeployment *kapps.Deployment) *kapps.Deployment { - containers := operator.ONNXPredictorContainers(api) + containers, volumes := operator.ONNXPredictorContainers(api) if config.Provider == types.AWSProviderType { containers = append(containers, operator.RequestMonitorContainer(api)) @@ -196,7 +196,7 @@ func onnxAPISpec(api *spec.API, prevDeployment *kapps.Deployment) *kapps.Deploym "workload": "true", }, Tolerations: operator.Tolerations, - Volumes: operator.DefaultVolumes(), + Volumes: volumes, ServiceAccountName: "default", }, }, diff --git a/pkg/types/spec/errors.go b/pkg/types/spec/errors.go index f7130ff791..8818e3bcb8 100644 --- a/pkg/types/spec/errors.go +++ b/pkg/types/spec/errors.go @@ -23,6 +23,7 @@ import ( "github.com/cortexlabs/cortex/pkg/consts" "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/files" + "github.com/cortexlabs/cortex/pkg/lib/k8s" libmath "github.com/cortexlabs/cortex/pkg/lib/math" "github.com/cortexlabs/cortex/pkg/lib/sets/strset" s "github.com/cortexlabs/cortex/pkg/lib/strings" @@ -52,6 +53,8 @@ const ( ErrModelCachingNotSupportedWhenMultiprocessingEnabled = "spec.model_caching_not_supported_when_multiprocessing_enabled" + ErrShmSizeCannotExceedMem = "spec.shm_size_cannot_exceed_mem" + ErrFileNotFound = "spec.file_not_found" ErrDirIsEmpty = "spec.dir_is_empty" ErrMustBeRelativeProjectPath = "spec.must_be_relative_project_path" @@ -228,6 +231,14 @@ func ErrorSurgeAndUnavailableBothZero() error { }) } +func ErrorShmSizeCannotExceedMem(shmSize k8s.Quantity, mem k8s.Quantity) error { + const maxNumProcesses int32 = 1 + return errors.WithStack(&errors.Error{ + Kind: ErrShmSizeCannotExceedMem, + Message: fmt.Sprintf("predictor.shm_size (%s) cannot exceed compute.mem (%s)", shmSize.UserString, mem.UserString), + }) +} + func ErrorModelCachingNotSupportedWhenMultiprocessingEnabled(desiredProcesses int32) error { const maxNumProcesses int32 = 1 return errors.WithStack(&errors.Error{ diff --git a/pkg/types/spec/validations.go b/pkg/types/spec/validations.go index 94d923edea..dc6ce7a7f5 100644 --- a/pkg/types/spec/validations.go +++ b/pkg/types/spec/validations.go @@ -204,6 +204,16 @@ func predictorValidation() *cr.StructFieldValidation { GreaterThanOrEqualTo: pointer.Int32(1), }, }, + { + StructField: "ShmSize", + StringPtrValidation: &cr.StringPtrValidation{ + Default: nil, + AllowExplicitNull: true, + }, + Parser: k8s.QuantityParser(&k8s.QuantityValidation{ + GreaterThanOrEqualTo: k8s.QuantityPtr(kresource.MustParse("20Mi")), + }), + }, { StructField: "LogLevel", StringValidation: &cr.StringValidation{ @@ -832,6 +842,12 @@ func ValidateAPI( } } + if api.Predictor != nil && api.Predictor.ShmSize != nil && api.Compute.Mem != nil { + if api.Predictor.ShmSize.Cmp(api.Compute.Mem.Quantity) > 0 { + return ErrorShmSizeCannotExceedMem(*api.Predictor.ShmSize, *api.Compute.Mem) + } + } + return nil } diff --git a/pkg/types/userconfig/api.go b/pkg/types/userconfig/api.go index adaef3859b..4ffb26ca6e 100644 --- a/pkg/types/userconfig/api.go +++ b/pkg/types/userconfig/api.go @@ -54,6 +54,7 @@ type Predictor struct { ServerSideBatching *ServerSideBatching `json:"server_side_batching" yaml:"server_side_batching"` ProcessesPerReplica int32 `json:"processes_per_replica" yaml:"processes_per_replica"` ThreadsPerProcess int32 `json:"threads_per_process" yaml:"threads_per_process"` + ShmSize *k8s.Quantity `json:"shm_size" yaml:"shm_size"` PythonPath *string `json:"python_path" yaml:"python_path"` LogLevel LogLevel `json:"log_level" yaml:"log_level"` Image string `json:"image" yaml:"image"` @@ -383,6 +384,10 @@ func (predictor *Predictor) UserStr() string { sb.WriteString(fmt.Sprintf("%s: %s\n", ProcessesPerReplicaKey, s.Int32(predictor.ProcessesPerReplica))) sb.WriteString(fmt.Sprintf("%s: %s\n", ThreadsPerProcessKey, s.Int32(predictor.ThreadsPerProcess))) + if predictor.ShmSize != nil { + sb.WriteString(fmt.Sprintf("%s: %s\n", ShmSize, predictor.ShmSize.UserString)) + } + if len(predictor.Config) > 0 { sb.WriteString(fmt.Sprintf("%s:\n", ConfigKey)) d, _ := yaml.Marshal(&predictor.Config) @@ -633,6 +638,11 @@ func (api *API) TelemetryEvent(provider types.ProviderType) map[string]interface event["predictor.type"] = api.Predictor.Type event["predictor.processes_per_replica"] = api.Predictor.ProcessesPerReplica event["predictor.threads_per_process"] = api.Predictor.ThreadsPerProcess + + if api.Predictor.ShmSize != nil { + event["predictor.shm_size"] = api.Predictor.ShmSize.String() + } + event["predictor.log_level"] = api.Predictor.LogLevel if api.Predictor.PythonPath != nil { diff --git a/pkg/types/userconfig/config_key.go b/pkg/types/userconfig/config_key.go index 1b36325511..0039aeecb3 100644 --- a/pkg/types/userconfig/config_key.go +++ b/pkg/types/userconfig/config_key.go @@ -40,6 +40,7 @@ const ( TensorFlowServingImageKey = "tensorflow_serving_image" ProcessesPerReplicaKey = "processes_per_replica" ThreadsPerProcessKey = "threads_per_process" + ShmSize = "shm_size" LogLevelKey = "log_level" ConfigKey = "config" EnvKey = "env" From 22a6ee043d078be102759fe8b1b05d3c21c52376 Mon Sep 17 00:00:00 2001 From: Vishal Bollu Date: Mon, 4 Jan 2021 15:45:27 -0500 Subject: [PATCH 2/3] Update validations.go --- pkg/types/spec/validations.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/types/spec/validations.go b/pkg/types/spec/validations.go index dc6ce7a7f5..c66bd224e1 100644 --- a/pkg/types/spec/validations.go +++ b/pkg/types/spec/validations.go @@ -210,9 +210,7 @@ func predictorValidation() *cr.StructFieldValidation { Default: nil, AllowExplicitNull: true, }, - Parser: k8s.QuantityParser(&k8s.QuantityValidation{ - GreaterThanOrEqualTo: k8s.QuantityPtr(kresource.MustParse("20Mi")), - }), + Parser: k8s.QuantityParser(&k8s.QuantityValidation{}), }, { StructField: "LogLevel", From e1f7ec3764c8611c739c8fe5cfb9a9f4d66b8feb Mon Sep 17 00:00:00 2001 From: vishal Date: Mon, 4 Jan 2021 17:50:41 -0500 Subject: [PATCH 3/3] PR review --- pkg/operator/operator/k8s.go | 2 +- pkg/types/spec/errors.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/operator/operator/k8s.go b/pkg/operator/operator/k8s.go index 6668ed7b49..6bb1196bcc 100644 --- a/pkg/operator/operator/k8s.go +++ b/pkg/operator/operator/k8s.go @@ -328,9 +328,9 @@ func TensorFlowPredictorContainers(api *spec.API) ([]kcore.Container, []kcore.Vo func ONNXPredictorContainers(api *spec.API) ([]kcore.Container, []kcore.Volume) { resourceList := kcore.ResourceList{} resourceLimitsList := kcore.ResourceList{} - containers := []kcore.Container{} apiPodVolumeMounts := defaultVolumeMounts() volumes := DefaultVolumes() + containers := []kcore.Container{} if api.Compute.CPU != nil { userPodCPURequest := k8s.QuantityPtr(api.Compute.CPU.Quantity.DeepCopy()) diff --git a/pkg/types/spec/errors.go b/pkg/types/spec/errors.go index 8818e3bcb8..fcb979e5d8 100644 --- a/pkg/types/spec/errors.go +++ b/pkg/types/spec/errors.go @@ -232,7 +232,6 @@ func ErrorSurgeAndUnavailableBothZero() error { } func ErrorShmSizeCannotExceedMem(shmSize k8s.Quantity, mem k8s.Quantity) error { - const maxNumProcesses int32 = 1 return errors.WithStack(&errors.Error{ Kind: ErrShmSizeCannotExceedMem, Message: fmt.Sprintf("predictor.shm_size (%s) cannot exceed compute.mem (%s)", shmSize.UserString, mem.UserString),