Skip to content

Add gRPC support to RealtimeAPI kind #1997

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
Mar 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
6027b1e
Add parts of grpc POC
RobertLucian Mar 19, 2021
3366c57
Small changes
RobertLucian Mar 23, 2021
0e666dc
Remove POC files
RobertLucian Mar 23, 2021
415a7c6
Implement Go-side for grpc
RobertLucian Mar 23, 2021
1114962
Go/Python implementations for grpc
RobertLucian Mar 23, 2021
baf6db0
gRPC on the serving container
RobertLucian Mar 24, 2021
e2922d3
Add an iris classifier example for gRPC
RobertLucian Mar 24, 2021
645b9bc
Add grpc-tools to requirements.txt file
RobertLucian Mar 24, 2021
07e2efc
Extend the gRPC iris-classifier example
RobertLucian Mar 24, 2021
816b59b
General fixes for gRPC
RobertLucian Mar 25, 2021
18aebab
Add post metrics for cortex get to gRPC
RobertLucian Mar 25, 2021
ea33ddf
Track streaming gRPC APIs
RobertLucian Mar 25, 2021
3fc8625
Address Vishal's comments
RobertLucian Mar 25, 2021
c931b08
Prevent the use of query_params, headers & batch_id on gRPC implement…
RobertLucian Mar 25, 2021
6ee0d7a
Fix typo
RobertLucian Mar 25, 2021
13d7f46
Fix error message
RobertLucian Mar 25, 2021
771c205
Prevent converting http->grpc when api is used by traffic splitter
RobertLucian Mar 25, 2021
795735f
Fix latency/response code metrics
RobertLucian Mar 25, 2021
74a689e
More bug fixes and tweaks
RobertLucian Mar 26, 2021
e84078c
Fix uvicorn servers
RobertLucian Mar 26, 2021
be4de0c
Fix path rewrite for realtime API
RobertLucian Mar 26, 2021
f78e09a
Add prime generator example
RobertLucian Mar 26, 2021
e6901a4
Add gRPC tests
RobertLucian Mar 27, 2021
2bbe246
Test suite fixes
RobertLucian Mar 27, 2021
6fd3200
Address PR comments
RobertLucian Mar 27, 2021
9a34b58
Fix tests
RobertLucian Mar 27, 2021
6a23353
Fix proto validation
RobertLucian Mar 27, 2021
1240682
Merge branch 'master' into feature/grpc
RobertLucian Mar 27, 2021
a3211f8
Last fixes (async/grpc)
RobertLucian Mar 27, 2021
2a0c1c7
Make lint
RobertLucian Mar 27, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions cli/cmd/lib_realtime_apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,14 @@ func realtimeAPITable(realtimeAPI schema.APIResponse, env cliconfig.Environment)
out += "\n" + console.Bold("metrics dashboard: ") + *realtimeAPI.DashboardURL + "\n"
}

out += "\n" + console.Bold("endpoint: ") + realtimeAPI.Endpoint + "\n"
if realtimeAPI.Spec.Predictor.IsGRPC() {
out += "\n" + console.Bold("insecure endpoint: ") + fmt.Sprintf("%s:%d", realtimeAPI.Endpoint, realtimeAPI.GRPCPorts["insecure"])
out += "\n" + console.Bold("secure endpoint: ") + fmt.Sprintf("%s:%d", realtimeAPI.Endpoint, realtimeAPI.GRPCPorts["secure"]) + "\n"
} else {
out += "\n" + console.Bold("endpoint: ") + realtimeAPI.Endpoint + "\n"
}

if !(realtimeAPI.Spec.Predictor.Type == userconfig.PythonPredictorType && realtimeAPI.Spec.Predictor.MultiModelReloading == nil) {
if !(realtimeAPI.Spec.Predictor.Type == userconfig.PythonPredictorType && realtimeAPI.Spec.Predictor.MultiModelReloading == nil) && realtimeAPI.Spec.Predictor.ProtobufPath == nil {
out += "\n" + describeModelInput(realtimeAPI.Status, realtimeAPI.Spec.Predictor, realtimeAPI.Endpoint)
}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/docker/docker v0.0.0-00010101000000-000000000000
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/emicklei/proto v1.9.0
github.com/fatih/color v1.10.0
github.com/getsentry/sentry-go v0.8.0
github.com/go-ole/go-ole v1.2.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZi
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153 h1:yUdfgN0XgIJw7foRItutHYUIhlcKzcSf5vDpdhQAKTc=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/emicklei/proto v1.9.0 h1:l0QiNT6Qs7Yj0Mb4X6dnWBQer4ebei2BFcgQLbGqUDc=
github.com/emicklei/proto v1.9.0/go.mod h1:rn1FgRS/FANiZdD2djyH7TMA9jdRDcYQ9IEN9yvjX0A=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
Expand Down
4 changes: 2 additions & 2 deletions manager/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ function cluster_up() {
setup_grafana
echo "✓"

echo -n "○ configuring gpu support (for the nodegroups that may require it)"
echo -n "○ configuring gpu support (for the nodegroups that may require it) "
envsubst < manifests/nvidia.yaml | kubectl apply -f - >/dev/null
NVIDIA_COM_GPU_VALUE=true envsubst < manifests/prometheus-dcgm-exporter.yaml | kubectl apply -f - >/dev/null
echo "✓"

echo -n "○ configuring inf support (for the nodegroups that may require it)"
echo -n "○ configuring inf support (for the nodegroups that may require it) "
envsubst < manifests/inferentia.yaml | kubectl apply -f - >/dev/null
echo "✓"

Expand Down
2 changes: 1 addition & 1 deletion pkg/cortex/serve/cortex_internal.requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
grpcio==1.32.0
grpcio==1.36.0
boto3==1.14.53
datadog==0.39.0
dill>=0.3.1.1
Expand Down
15 changes: 15 additions & 0 deletions pkg/cortex/serve/cortex_internal/lib/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,21 @@ def post_request_metrics(self, status_code, total_time):
]
self.post_metrics(metrics)

def post_status_code_request_metrics(self, status_code):
metrics = [
self.status_code_metric(self.metric_dimensions(), status_code),
self.status_code_metric(self.metric_dimensions_with_id(), status_code),
]
self.post_metrics(metrics)

def post_latency_request_metrics(self, total_time):
total_time_ms = total_time * 1000
metrics = [
self.latency_metric(self.metric_dimensions(), total_time_ms),
self.latency_metric(self.metric_dimensions_with_id(), total_time_ms),
]
self.post_metrics(metrics)

def post_metrics(self, metrics):
try:
if self.statsd is None:
Expand Down
21 changes: 14 additions & 7 deletions pkg/cortex/serve/cortex_internal/lib/api/predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from cortex_internal.lib.api.validations import (
validate_class_impl,
validate_python_predictor_with_models,
validate_predictor_with_grpc,
are_models_specified,
)
from cortex_internal.lib.client.onnx import ONNXClient
Expand Down Expand Up @@ -61,12 +62,12 @@
{
"name": "__init__",
"required_args": ["self", "config"],
"optional_args": ["job_spec", "python_client", "metrics_client"],
"optional_args": ["job_spec", "python_client", "metrics_client", "proto_module_pb2"],
},
{
"name": "predict",
"required_args": ["self"],
"optional_args": ["payload", "query_params", "headers", "batch_id"],
"optional_args": ["payload", "query_params", "headers", "batch_id", "context"],
},
],
"optional": [
Expand All @@ -88,12 +89,12 @@
{
"name": "__init__",
"required_args": ["self", "tensorflow_client", "config"],
"optional_args": ["job_spec", "metrics_client"],
"optional_args": ["job_spec", "metrics_client", "proto_module_pb2"],
},
{
"name": "predict",
"required_args": ["self"],
"optional_args": ["payload", "query_params", "headers", "batch_id"],
"optional_args": ["payload", "query_params", "headers", "batch_id", "context"],
},
],
"optional": [
Expand All @@ -111,12 +112,12 @@
{
"name": "__init__",
"required_args": ["self", "onnx_client", "config"],
"optional_args": ["job_spec", "metrics_client"],
"optional_args": ["job_spec", "metrics_client", "proto_module_pb2"],
},
{
"name": "predict",
"required_args": ["self"],
"optional_args": ["payload", "query_params", "headers", "batch_id"],
"optional_args": ["payload", "query_params", "headers", "batch_id", "context"],
},
],
"optional": [
Expand Down Expand Up @@ -146,6 +147,7 @@ def __init__(self, api_spec: dict, model_dir: str):
self.type = predictor_type_from_api_spec(api_spec)
self.path = api_spec["predictor"]["path"]
self.config = api_spec["predictor"].get("config", {})
self.protobuf_path = api_spec["predictor"].get("protobuf_path")

self.api_spec = api_spec

Expand Down Expand Up @@ -234,12 +236,14 @@ def initialize_impl(
project_dir: str,
client: Union[PythonClient, TensorFlowClient, ONNXClient],
metrics_client: DogStatsd,
job_spec: Dict[str, Any] = None,
job_spec: Optional[Dict[str, Any]] = None,
proto_module_pb2: Optional[Any] = None,
):
"""
Initialize predictor class as provided by the user.

job_spec is a dictionary when the "kind" of the API is set to "BatchAPI". Otherwise, it's None.
proto_module_pb2 is a module of the compiled proto when grpc is enabled for the "RealtimeAPI" kind. Otherwise, it's None.

Can raise UserRuntimeException/UserException/CortexException.
"""
Expand All @@ -257,6 +261,8 @@ def initialize_impl(
args["job_spec"] = job_spec
if "metrics_client" in constructor_args:
args["metrics_client"] = metrics_client
if "proto_module_pb2" in constructor_args:
args["proto_module_pb2"] = proto_module_pb2

# initialize predictor class
try:
Expand Down Expand Up @@ -328,6 +334,7 @@ def class_impl(self, project_dir):

try:
validate_class_impl(predictor_class, validations)
validate_predictor_with_grpc(predictor_class, self.api_spec)
if self.type == PythonPredictorType:
validate_python_predictor_with_models(predictor_class, self.api_spec)
except Exception as e:
Expand Down
79 changes: 60 additions & 19 deletions pkg/cortex/serve/cortex_internal/lib/api/validations.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import inspect
from typing import Dict

from cortex_internal.lib import util
from cortex_internal.lib.exceptions import UserException
from cortex_internal.lib.type import predictor_type_from_api_spec, PythonPredictorType

Expand Down Expand Up @@ -90,27 +91,28 @@ def validate_required_method_args(impl, func_signature):


def validate_python_predictor_with_models(impl, api_spec):
target_class_name = impl.__name__
if not are_models_specified(api_spec):
return

if are_models_specified(api_spec):
constructor = getattr(impl, "__init__")
constructor_arg_spec = inspect.getfullargspec(constructor)
if "python_client" not in constructor_arg_spec.args:
raise UserException(
f"class {target_class_name}",
f'invalid signature for method "__init__"',
f'"python_client" is a required argument, but was not provided',
f"when the python predictor type is used and models are specified in the api spec, "
f'adding the "python_client" argument is required',
)
target_class_name = impl.__name__
constructor = getattr(impl, "__init__")
constructor_arg_spec = inspect.getfullargspec(constructor)
if "python_client" not in constructor_arg_spec.args:
raise UserException(
f"class {target_class_name}",
f'invalid signature for method "__init__"',
f'"python_client" is a required argument, but was not provided',
f"when the python predictor type is used and models are specified in the api spec, "
f'adding the "python_client" argument is required',
)

if getattr(impl, "load_model", None) is None:
raise UserException(
f"class {target_class_name}",
f'required method "load_model" is not defined',
f"when the python predictor type is used and models are specified in the api spec, "
f'adding the "load_model" method is required',
)
if getattr(impl, "load_model", None) is None:
raise UserException(
f"class {target_class_name}",
f'required method "load_model" is not defined',
f"when the python predictor type is used and models are specified in the api spec, "
f'adding the "load_model" method is required',
)


def are_models_specified(api_spec: Dict) -> bool:
Expand All @@ -130,3 +132,42 @@ def are_models_specified(api_spec: Dict) -> bool:
return False

return models is not None


def is_grpc_enabled(api_spec: Dict) -> bool:
"""
Checks if the API has the grpc protocol enabled (cortex.yaml).

Args:
api_spec: API configuration.
"""
return api_spec["predictor"]["protobuf_path"] is not None


def validate_predictor_with_grpc(impl, api_spec):
if not is_grpc_enabled(api_spec):
return

target_class_name = impl.__name__
constructor = getattr(impl, "__init__")
constructor_arg_spec = inspect.getfullargspec(constructor)
if "proto_module_pb2" not in constructor_arg_spec.args:
raise UserException(
f"class {target_class_name}",
f'invalid signature for method "__init__"',
f'"proto_module_pb2" is a required argument, but was not provided',
f"when a protobuf is specified in the api spec, then that means the grpc protocol is enabled, "
f'which means that adding the "proto_module_pb2" argument is required',
)

predictor = getattr(impl, "predict")
predictor_arg_spec = inspect.getfullargspec(predictor)
disallowed_params = list(
set(["query_params", "headers", "batch_id"]).intersection(predictor_arg_spec.args)
)
if len(disallowed_params) > 0:
raise UserException(
f"class {target_class_name}",
f'invalid signature for method "predict"',
f'{util.string_plural_with_s("argument", len(disallowed_params))} {util.and_list_with_quotes(disallowed_params)} cannot be used when the grpc protocol is enabled',
)
41 changes: 41 additions & 0 deletions pkg/cortex/serve/cortex_internal/lib/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,47 @@ def is_float_or_int_list(var):
return True


def and_list_with_quotes(values: List) -> str:
"""
Converts a list like ["a", "b", "c"] to '"a", "b" and "c"'".
"""
string = ""

if len(values) == 1:
string = '"' + values[0] + '"'
elif len(values) > 1:
for val in values[:-2]:
string += '"' + val + '", '
string += '"' + values[-2] + '" and "' + values[-1] + '"'

return string


def or_list_with_quotes(values: List) -> str:
"""
Converts a list like ["a", "b", "c"] to '"a", "b" or "c"'.
"""
string = ""

if len(values) == 1:
string = '"' + values[0] + '"'
elif len(values) > 1:
for val in values[:-2]:
string += '"' + val + '", '
string += '"' + values[-2] + '" or "' + values[-1] + '"'

return string


def string_plural_with_s(string: str, count: int) -> str:
"""
Pluralize the word with an "s" character if the count is greater than 1.
"""
if count > 1:
string += "s"
return string


def render_jinja_template(jinja_template_file: str, context: dict) -> str:
from jinja2 import Environment, FileSystemLoader

Expand Down
20 changes: 17 additions & 3 deletions pkg/cortex/serve/init/bootloader.sh
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,25 @@ create_s6_service_from_file() {

# prepare webserver
if [ "$CORTEX_KIND" = "RealtimeAPI" ]; then
if [ $CORTEX_SERVING_PROTOCOL = "http" ]; then
mkdir /run/servers
fi

if [ $CORTEX_SERVING_PROTOCOL = "grpc" ]; then
/opt/conda/envs/env/bin/python -m grpc_tools.protoc --proto_path=$CORTEX_PROJECT_DIR --python_out=$CORTEX_PYTHON_PATH --grpc_python_out=$CORTEX_PYTHON_PATH $CORTEX_PROTOBUF_FILE
fi

# prepare uvicorn workers
mkdir /run/uvicorn
# prepare servers
for i in $(seq 1 $CORTEX_PROCESSES_PER_REPLICA); do
create_s6_service "uvicorn-$((i-1))" "cd /mnt/project && $source_env_file_cmd && PYTHONUNBUFFERED=TRUE PYTHONPATH=$PYTHONPATH:$CORTEX_PYTHON_PATH exec /opt/conda/envs/env/bin/python /src/cortex/serve/start/server.py /run/uvicorn/proc-$((i-1)).sock"
# prepare uvicorn workers
if [ $CORTEX_SERVING_PROTOCOL = "http" ]; then
create_s6_service "uvicorn-$((i-1))" "cd /mnt/project && $source_env_file_cmd && PYTHONUNBUFFERED=TRUE PYTHONPATH=$PYTHONPATH:$CORTEX_PYTHON_PATH exec /opt/conda/envs/env/bin/python /src/cortex/serve/start/server.py /run/servers/proc-$((i-1)).sock"
fi

# prepare grpc workers
if [ $CORTEX_SERVING_PROTOCOL = "grpc" ]; then
create_s6_service "grpc-$((i-1))" "cd /mnt/project && $source_env_file_cmd && PYTHONUNBUFFERED=TRUE PYTHONPATH=$PYTHONPATH:$CORTEX_PYTHON_PATH exec /opt/conda/envs/env/bin/python /src/cortex/serve/start/server_grpc.py localhost:$((i-1+20000))"
fi
done

# generate nginx conf
Expand Down
Loading