diff --git a/docs/workloads/batch/configuration.md b/docs/workloads/batch/configuration.md index 1effda8ed7..a6d72fd96c 100644 --- a/docs/workloads/batch/configuration.md +++ b/docs/workloads/batch/configuration.md @@ -14,6 +14,7 @@ image: # docker image to use for the Predictor (default: quay.io/cortexlabs/python-predictor-cpu:master or quay.io/cortexlabs/python-predictor-gpu:master based on compute) env: # dictionary of environment variables log_level: # log level that can be "debug", "info", "warning" or "error" (default: "info") + shm_size: # size of shared memory (/dev/shm) for sharing data between multiple processes, e.g. 64Mi or 1Gi (default: Null) networking: endpoint: # the endpoint for the API (default: ) api_gateway: public | none # whether to create a public API Gateway endpoint for this API (if not, the API will still be accessible via the load balancer) (default: public, unless disabled cluster-wide) @@ -50,6 +51,7 @@ tensorflow_serving_image: # docker image to use for the TensorFlow Serving container (default: quay.io/cortexlabs/tensorflow-serving-gpu:master or quay.io/cortexlabs/tensorflow-serving-cpu:master based on compute) env: # dictionary of environment variables log_level: # log level that can be "debug", "info", "warning" or "error" (default: "info") + shm_size: # size of shared memory (/dev/shm) for sharing data between multiple processes, e.g. 64Mi or 1Gi (default: Null) networking: endpoint: # the endpoint for the API (default: ) api_gateway: public | none # whether to create a public API Gateway endpoint for this API (if not, the API will still be accessible via the load balancer) (default: public, unless disabled cluster-wide) @@ -80,6 +82,7 @@ image: # docker image to use for the Predictor (default: quay.io/cortexlabs/onnx-predictor-gpu:master or quay.io/cortexlabs/onnx-predictor-cpu:master based on compute) env: # dictionary of environment variables log_level: # log level that can be "debug", "info", "warning" or "error" (default: "info") + shm_size: # size of shared memory (/dev/shm) for sharing data between multiple processes, e.g. 64Mi or 1Gi (default: Null) networking: endpoint: # the endpoint for the API (default: ) api_gateway: public | none # whether to create a public API Gateway endpoint for this API (if not, the API will still be accessible via the load balancer) (default: public, unless disabled cluster-wide) diff --git a/docs/workloads/realtime/configuration.md b/docs/workloads/realtime/configuration.md index e11aa1d02a..20280ab5d3 100644 --- a/docs/workloads/realtime/configuration.md +++ b/docs/workloads/realtime/configuration.md @@ -28,6 +28,7 @@ image: # docker image to use for the Predictor (default: quay.io/cortexlabs/python-predictor-cpu:master or quay.io/cortexlabs/python-predictor-gpu:master based on compute) env: # dictionary of environment variables log_level: # log level that can be "debug", "info", "warning" or "error" (default: "info") + shm_size: # size of shared memory (/dev/shm) for sharing data between multiple processes, e.g. 64Mi or 1Gi (default: Null) networking: endpoint: # the endpoint for the API (default: ) api_gateway: public | none # whether to create a public API Gateway endpoint for this API (if not, the API will still be accessible via the load balancer) (default: public, unless disabled cluster-wide) (aws only) @@ -85,6 +86,7 @@ tensorflow_serving_image: # docker image to use for the TensorFlow Serving container (default: quay.io/cortexlabs/tensorflow-serving-gpu:master or quay.io/cortexlabs/tensorflow-serving-cpu:master based on compute) env: # dictionary of environment variables log_level: # log level that can be "debug", "info", "warning" or "error" (default: "info") + shm_size: # size of shared memory (/dev/shm) for sharing data between multiple processes, e.g. 64Mi or 1Gi (default: Null) networking: endpoint: # the endpoint for the API (default: ) api_gateway: public | none # whether to create a public API Gateway endpoint for this API (if not, the API will still be accessible via the load balancer) (default: public, unless disabled cluster-wide) (aws only) @@ -136,6 +138,7 @@ image: # docker image to use for the Predictor (default: quay.io/cortexlabs/onnx-predictor-gpu:master or quay.io/cortexlabs/onnx-predictor-cpu:master based on compute) env: # dictionary of environment variables log_level: # log level that can be "debug", "info", "warning" or "error" (default: "info") + shm_size: # size of shared memory (/dev/shm) for sharing data between multiple processes, e.g. 64Mi or 1Gi (default: Null) networking: endpoint: # the endpoint for the API (default: ) api_gateway: public | none # whether to create a public API Gateway endpoint for this API (if not, the API will still be accessible via the load balancer) (default: public, unless disabled cluster-wide) (aws only) diff --git a/pkg/operator/operator/k8s.go b/pkg/operator/operator/k8s.go index d0844d9537..6bb1196bcc 100644 --- a/pkg/operator/operator/k8s.go +++ b/pkg/operator/operator/k8s.go @@ -140,7 +140,6 @@ func PythonPredictorContainers(api *spec.API) ([]kcore.Container, []kcore.Volume apiPodResourceList["nvidia.com/gpu"] = *kresource.NewQuantity(api.Compute.GPU, kresource.DecimalSI) apiPodResourceLimitsList["nvidia.com/gpu"] = *kresource.NewQuantity(api.Compute.GPU, kresource.DecimalSI) } - } else { volumes = append(volumes, kcore.Volume{ Name: "neuron-sock", @@ -173,6 +172,22 @@ func PythonPredictorContainers(api *spec.API) ([]kcore.Container, []kcore.Volume containers = append(containers, neuronContainer) } + if api.Predictor.ShmSize != nil { + volumes = append(volumes, kcore.Volume{ + Name: "dshm", + VolumeSource: kcore.VolumeSource{ + EmptyDir: &kcore.EmptyDirVolumeSource{ + Medium: kcore.StorageMediumMemory, + SizeLimit: k8s.QuantityPtr(api.Predictor.ShmSize.Quantity), + }, + }, + }) + apiPodVolumeMounts = append(apiPodVolumeMounts, kcore.VolumeMount{ + Name: "dshm", + MountPath: "/dev/shm", + }) + } + containers = append(containers, kcore.Container{ Name: APIContainerName, Image: api.Predictor.Image, @@ -262,6 +277,22 @@ func TensorFlowPredictorContainers(api *spec.API) ([]kcore.Container, []kcore.Vo containers = append(containers, neuronContainer) } + if api.Predictor.ShmSize != nil { + volumes = append(volumes, kcore.Volume{ + Name: "dshm", + VolumeSource: kcore.VolumeSource{ + EmptyDir: &kcore.EmptyDirVolumeSource{ + Medium: kcore.StorageMediumMemory, + SizeLimit: k8s.QuantityPtr(api.Predictor.ShmSize.Quantity), + }, + }, + }) + volumeMounts = append(volumeMounts, kcore.VolumeMount{ + Name: "dshm", + MountPath: "/dev/shm", + }) + } + containers = append(containers, kcore.Container{ Name: APIContainerName, Image: api.Predictor.Image, @@ -294,9 +325,11 @@ func TensorFlowPredictorContainers(api *spec.API) ([]kcore.Container, []kcore.Vo return containers, volumes } -func ONNXPredictorContainers(api *spec.API) []kcore.Container { +func ONNXPredictorContainers(api *spec.API) ([]kcore.Container, []kcore.Volume) { resourceList := kcore.ResourceList{} resourceLimitsList := kcore.ResourceList{} + apiPodVolumeMounts := defaultVolumeMounts() + volumes := DefaultVolumes() containers := []kcore.Container{} if api.Compute.CPU != nil { @@ -316,13 +349,29 @@ func ONNXPredictorContainers(api *spec.API) []kcore.Container { resourceLimitsList["nvidia.com/gpu"] = *kresource.NewQuantity(api.Compute.GPU, kresource.DecimalSI) } + if api.Predictor.ShmSize != nil { + volumes = append(volumes, kcore.Volume{ + Name: "dshm", + VolumeSource: kcore.VolumeSource{ + EmptyDir: &kcore.EmptyDirVolumeSource{ + Medium: kcore.StorageMediumMemory, + SizeLimit: k8s.QuantityPtr(api.Predictor.ShmSize.Quantity), + }, + }, + }) + apiPodVolumeMounts = append(apiPodVolumeMounts, kcore.VolumeMount{ + Name: "dshm", + MountPath: "/dev/shm", + }) + } + containers = append(containers, kcore.Container{ Name: APIContainerName, Image: api.Predictor.Image, ImagePullPolicy: kcore.PullAlways, Env: getEnvVars(api, APIContainerName), EnvFrom: baseEnvVars(), - VolumeMounts: defaultVolumeMounts(), + VolumeMounts: apiPodVolumeMounts, ReadinessProbe: FileExistsProbe(_apiReadinessFile), LivenessProbe: _apiLivenessProbe, Lifecycle: nginxGracefulStopper(api.Kind), @@ -338,7 +387,7 @@ func ONNXPredictorContainers(api *spec.API) []kcore.Container { }, }) - return containers + return containers, volumes } func getEnvVars(api *spec.API, container string) []kcore.EnvVar { diff --git a/pkg/operator/resources/batchapi/k8s_specs.go b/pkg/operator/resources/batchapi/k8s_specs.go index a1fbd58d95..c2c615a445 100644 --- a/pkg/operator/resources/batchapi/k8s_specs.go +++ b/pkg/operator/resources/batchapi/k8s_specs.go @@ -144,7 +144,7 @@ func tensorFlowPredictorJobSpec(api *spec.API, job *spec.Job) (*kbatch.Job, erro } func onnxPredictorJobSpec(api *spec.API, job *spec.Job) (*kbatch.Job, error) { - containers := operator.ONNXPredictorContainers(api) + containers, volumes := operator.ONNXPredictorContainers(api) for i, container := range containers { if container.Name == operator.APIContainerName { @@ -186,7 +186,7 @@ func onnxPredictorJobSpec(api *spec.API, job *spec.Job) (*kbatch.Job, error) { "workload": "true", }, Tolerations: operator.Tolerations, - Volumes: operator.DefaultVolumes(), + Volumes: volumes, ServiceAccountName: "default", }, }, diff --git a/pkg/operator/resources/realtimeapi/k8s_specs.go b/pkg/operator/resources/realtimeapi/k8s_specs.go index 300c1153e0..5667703dc2 100644 --- a/pkg/operator/resources/realtimeapi/k8s_specs.go +++ b/pkg/operator/resources/realtimeapi/k8s_specs.go @@ -152,7 +152,7 @@ func pythonAPISpec(api *spec.API, prevDeployment *kapps.Deployment) *kapps.Deplo } func onnxAPISpec(api *spec.API, prevDeployment *kapps.Deployment) *kapps.Deployment { - containers := operator.ONNXPredictorContainers(api) + containers, volumes := operator.ONNXPredictorContainers(api) if config.Provider == types.AWSProviderType { containers = append(containers, operator.RequestMonitorContainer(api)) @@ -196,7 +196,7 @@ func onnxAPISpec(api *spec.API, prevDeployment *kapps.Deployment) *kapps.Deploym "workload": "true", }, Tolerations: operator.Tolerations, - Volumes: operator.DefaultVolumes(), + Volumes: volumes, ServiceAccountName: "default", }, }, diff --git a/pkg/types/spec/errors.go b/pkg/types/spec/errors.go index f7130ff791..fcb979e5d8 100644 --- a/pkg/types/spec/errors.go +++ b/pkg/types/spec/errors.go @@ -23,6 +23,7 @@ import ( "github.com/cortexlabs/cortex/pkg/consts" "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/files" + "github.com/cortexlabs/cortex/pkg/lib/k8s" libmath "github.com/cortexlabs/cortex/pkg/lib/math" "github.com/cortexlabs/cortex/pkg/lib/sets/strset" s "github.com/cortexlabs/cortex/pkg/lib/strings" @@ -52,6 +53,8 @@ const ( ErrModelCachingNotSupportedWhenMultiprocessingEnabled = "spec.model_caching_not_supported_when_multiprocessing_enabled" + ErrShmSizeCannotExceedMem = "spec.shm_size_cannot_exceed_mem" + ErrFileNotFound = "spec.file_not_found" ErrDirIsEmpty = "spec.dir_is_empty" ErrMustBeRelativeProjectPath = "spec.must_be_relative_project_path" @@ -228,6 +231,13 @@ func ErrorSurgeAndUnavailableBothZero() error { }) } +func ErrorShmSizeCannotExceedMem(shmSize k8s.Quantity, mem k8s.Quantity) error { + return errors.WithStack(&errors.Error{ + Kind: ErrShmSizeCannotExceedMem, + Message: fmt.Sprintf("predictor.shm_size (%s) cannot exceed compute.mem (%s)", shmSize.UserString, mem.UserString), + }) +} + func ErrorModelCachingNotSupportedWhenMultiprocessingEnabled(desiredProcesses int32) error { const maxNumProcesses int32 = 1 return errors.WithStack(&errors.Error{ diff --git a/pkg/types/spec/validations.go b/pkg/types/spec/validations.go index 61872057f0..4f2c8cd9f8 100644 --- a/pkg/types/spec/validations.go +++ b/pkg/types/spec/validations.go @@ -203,6 +203,14 @@ func predictorValidation() *cr.StructFieldValidation { GreaterThanOrEqualTo: pointer.Int32(1), }, }, + { + StructField: "ShmSize", + StringPtrValidation: &cr.StringPtrValidation{ + Default: nil, + AllowExplicitNull: true, + }, + Parser: k8s.QuantityParser(&k8s.QuantityValidation{}), + }, { StructField: "LogLevel", StringValidation: &cr.StringValidation{ @@ -795,6 +803,12 @@ func ValidateAPI( } } + if api.Predictor != nil && api.Predictor.ShmSize != nil && api.Compute.Mem != nil { + if api.Predictor.ShmSize.Cmp(api.Compute.Mem.Quantity) > 0 { + return ErrorShmSizeCannotExceedMem(*api.Predictor.ShmSize, *api.Compute.Mem) + } + } + return nil } diff --git a/pkg/types/userconfig/api.go b/pkg/types/userconfig/api.go index 613a8796e5..74ff3f2b0d 100644 --- a/pkg/types/userconfig/api.go +++ b/pkg/types/userconfig/api.go @@ -53,6 +53,7 @@ type Predictor struct { ServerSideBatching *ServerSideBatching `json:"server_side_batching" yaml:"server_side_batching"` ProcessesPerReplica int32 `json:"processes_per_replica" yaml:"processes_per_replica"` ThreadsPerProcess int32 `json:"threads_per_process" yaml:"threads_per_process"` + ShmSize *k8s.Quantity `json:"shm_size" yaml:"shm_size"` PythonPath *string `json:"python_path" yaml:"python_path"` LogLevel LogLevel `json:"log_level" yaml:"log_level"` Image string `json:"image" yaml:"image"` @@ -370,6 +371,10 @@ func (predictor *Predictor) UserStr() string { sb.WriteString(fmt.Sprintf("%s: %s\n", ProcessesPerReplicaKey, s.Int32(predictor.ProcessesPerReplica))) sb.WriteString(fmt.Sprintf("%s: %s\n", ThreadsPerProcessKey, s.Int32(predictor.ThreadsPerProcess))) + if predictor.ShmSize != nil { + sb.WriteString(fmt.Sprintf("%s: %s\n", ShmSize, predictor.ShmSize.UserString)) + } + if len(predictor.Config) > 0 { sb.WriteString(fmt.Sprintf("%s:\n", ConfigKey)) d, _ := yaml.Marshal(&predictor.Config) @@ -603,6 +608,11 @@ func (api *API) TelemetryEvent(provider types.ProviderType) map[string]interface event["predictor.type"] = api.Predictor.Type event["predictor.processes_per_replica"] = api.Predictor.ProcessesPerReplica event["predictor.threads_per_process"] = api.Predictor.ThreadsPerProcess + + if api.Predictor.ShmSize != nil { + event["predictor.shm_size"] = api.Predictor.ShmSize.String() + } + event["predictor.log_level"] = api.Predictor.LogLevel if api.Predictor.PythonPath != nil { diff --git a/pkg/types/userconfig/config_key.go b/pkg/types/userconfig/config_key.go index f42334431d..f0a651374e 100644 --- a/pkg/types/userconfig/config_key.go +++ b/pkg/types/userconfig/config_key.go @@ -39,6 +39,7 @@ const ( TensorFlowServingImageKey = "tensorflow_serving_image" ProcessesPerReplicaKey = "processes_per_replica" ThreadsPerProcessKey = "threads_per_process" + ShmSize = "shm_size" LogLevelKey = "log_level" ConfigKey = "config" EnvKey = "env"