Skip to content

Add support for ONNX and TF predictor types on AsyncAPI #1996

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Mar 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions docs/workloads/async/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,52 @@ predictor:
shm_size: <string> # size of shared memory (/dev/shm) for sharing data between multiple processes, e.g. 64Mi or 1Gi (default: Null)
```

### Tensorflow Predictor

<!-- CORTEX_VERSION_BRANCH_STABLE x3 -->

```yaml
predictor:
type: tensorflow
path: <string> # path to a python file with a TensorFlowPredictor class definition, relative to the Cortex root (required)
dependencies: # (optional)
pip: <string> # relative path to requirements.txt (default: requirements.txt)
conda: <string> # relative path to conda-packages.txt (default: conda-packages.txt)
shell: <string> # relative path to a shell script for system package installation (default: dependencies.sh)
models: # (required)
path: <string> # S3/GCS path to an exported SavedModel directory (e.g. s3://my-bucket/exported_model/) (either this, 'dir', or 'paths' must be provided)
signature_key: # name of the signature def to use for prediction (required if your model has more than one signature def)
config: <string: value> # arbitrary dictionary passed to the constructor of the Predictor (optional)
python_path: <string> # path to the root of your Python folder that will be appended to PYTHONPATH (default: folder containing cortex.yaml)
image: <string> # docker image to use for the Predictor (default: quay.io/cortexlabs/tensorflow-predictor:master)
tensorflow_serving_image: <string> # docker image to use for the TensorFlow Serving container (default: quay.io/cortexlabs/tensorflow-serving-cpu:master, quay.io/cortexlabs/tensorflow-serving-gpu:master, or quay.io/cortexlabs/tensorflow-serving-inf:master based on compute)
env: <string: string> # dictionary of environment variables
log_level: <string> # log level that can be "debug", "info", "warning" or "error" (default: "info")
shm_size: <string> # size of shared memory (/dev/shm) for sharing data between multiple processes, e.g. 64Mi or 1Gi (default: Null)
```

### ONNX Predictor

<!-- CORTEX_VERSION_BRANCH_STABLE x3 -->

```yaml
predictor:
type: onnx
path: <string> # path to a python file with an ONNXPredictor class definition, relative to the Cortex root (required)
dependencies: # (optional)
pip: <string> # relative path to requirements.txt (default: requirements.txt)
conda: <string> # relative path to conda-packages.txt (default: conda-packages.txt)
shell: <string> # relative path to a shell script for system package installation (default: dependencies.sh)
models: # (required)
path: <string> # S3/GCS path to an exported model directory (e.g. s3://my-bucket/exported_model/) (either this, 'dir', or 'paths' must be provided)
config: <string: value> # arbitrary dictionary passed to the constructor of the Predictor (optional)
python_path: <string> # path to the root of your Python folder that will be appended to PYTHONPATH (default: folder containing cortex.yaml)
image: <string> # docker image to use for the Predictor (default: quay.io/cortexlabs/onnx-predictor-cpu:master or quay.io/cortexlabs/onnx-predictor-gpu:master based on compute)
env: <string: string> # dictionary of environment variables
log_level: <string> # log level that can be "debug", "info", "warning" or "error" (default: "info")
shm_size: <string> # size of shared memory (/dev/shm) for sharing data between multiple processes, e.g. 64Mi or 1Gi (default: Null)
```

## Compute

```yaml
Expand Down
119 changes: 119 additions & 0 deletions docs/workloads/async/predictors.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,125 @@ learn about how headers can be used to change the type of `payload` that is pass
At this moment, the AsyncAPI `predict` method can only return `JSON`-parseable objects. Navigate to
the [API responses](#api-responses) section to learn about how to configure it.

## TensorFlow Predictor

**Uses TensorFlow version 2.3.0 by default**

### Interface

```python
class TensorFlowPredictor:
def __init__(self, config, tensorflow_client, metrics_client):
"""(Required) Called once before the API becomes available. Performs
setup such as downloading/initializing a vocabulary.

Args:
config (required): Dictionary passed from API configuration (if
specified).
tensorflow_client (required): TensorFlow client which is used to
make predictions. This should be saved for use in predict().
metrics_client (optional): The cortex metrics client, which allows
you to push custom metrics in order to build custom dashboards
in grafana.
"""
self.client = tensorflow_client
# Additional initialization may be done here

def predict(self, payload, request_id):
"""(Required) Called once per request. Preprocesses the request payload
(if necessary), runs inference (e.g. by calling
self.client.predict(model_input)), and postprocesses the inference
output (if necessary).

Args:
payload (optional): The request payload (see below for the possible
payload types).
request_id (optional): The request id string that identifies a workload

Returns:
Prediction or a batch of predictions.
"""
pass
```

<!-- CORTEX_VERSION_MINOR -->

Cortex provides a `tensorflow_client` to your Predictor's constructor. `tensorflow_client` is an instance
of [TensorFlowClient](https://github.com/cortexlabs/cortex/tree/master/pkg/cortex/serve/cortex_internal/lib/client/tensorflow.py)
that manages a connection to a TensorFlow Serving container to make predictions using your model. It should be saved as
an instance variable in your Predictor, and your `predict()` function should call `tensorflow_client.predict()` to make
an inference with your exported TensorFlow model. Preprocessing of the JSON payload and postprocessing of predictions
can be implemented in your `predict()` function as well.

For proper separation of concerns, it is recommended to use the constructor's `config` parameter for information such as
from where to download the model and initialization files, or any configurable model parameters. You define `config` in
your API configuration, and it is passed through to your Predictor's constructor.

Your API can accept requests with different types of payloads. Navigate to the [API requests](#api-requests) section to
learn about how headers can be used to change the type of `payload` that is passed into your `predict` method.

At this moment, the AsyncAPI `predict` method can only return `JSON`-parseable objects. Navigate to
the [API responses](#api-responses) section to learn about how to configure it.

## ONNX Predictor

**Uses ONNX Runtime version 1.6.0 by default**

### Interface

```python
class ONNXPredictor:
def __init__(self, config, onnx_client, metrics_client):
"""(Required) Called once before the API becomes available. Performs
setup such as downloading/initializing a vocabulary.

Args:
onnx_client (required): ONNX client which is used to make
predictions. This should be saved for use in predict().
config (required): Dictionary passed from API configuration (if
specified).
metrics_client (optional): The cortex metrics client, which allows
you to push custom metrics in order to build custom dashboards
in grafana.
"""
self.client = onnx_client
# Additional initialization may be done here

def predict(self, payload, request_id):
"""(Required) Called once per request. Preprocesses the request payload
(if necessary), runs inference (e.g. by calling
self.client.predict(model_input)), and postprocesses the inference
output (if necessary).

Args:
payload (optional): The request payload (see below for the possible
payload types).
request_id (optional): The request id string that identifies a workload

Returns:
Prediction or a batch of predictions.
"""
pass
```
<!-- CORTEX_VERSION_MINOR -->

Cortex provides an `onnx_client` to your Predictor's constructor. `onnx_client` is an instance
of [ONNXClient](https://github.com/cortexlabs/cortex/tree/master/pkg/cortex/serve/cortex_internal/lib/client/onnx.py)
that manages an ONNX Runtime session to make predictions using your model. It should be saved as an instance variable in
your Predictor, and your `predict()` function should call `onnx_client.predict()` to make an inference with your
exported ONNX model. Preprocessing of the JSON payload and postprocessing of predictions can be implemented in
your `predict()` function as well.

For proper separation of concerns, it is recommended to use the constructor's `config` parameter for information such as
from where to download the model and initialization files, or any configurable model parameters. You define `config` in
your API configuration, and it is passed through to your Predictor's constructor.

Your API can accept requests with different types of payloads. Navigate to the [API requests](#api-requests) section to
learn about how headers can be used to change the type of `payload` that is passed into your `predict` method.

At this moment, the AsyncAPI `predict` method can only return `JSON`-parseable objects. Navigate to
the [API responses](#api-responses) section to learn about how to configure it.

## API requests

The type of the `payload` parameter in `predict(self, payload)` can vary based on the content type of the request.
Expand Down
90 changes: 83 additions & 7 deletions pkg/cortex/serve/cortex_internal/lib/api/async.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,19 @@
import dill

from cortex_internal.lib.api.validations import validate_class_impl
from cortex_internal.lib.client.onnx import ONNXClient
from cortex_internal.lib.client.tensorflow import TensorFlowClient
from cortex_internal.lib.exceptions import CortexException, UserException, UserRuntimeException
from cortex_internal.lib.metrics import MetricsClient
from cortex_internal.lib.model import ModelsHolder
from cortex_internal.lib.storage import S3
from cortex_internal.lib.type import (
predictor_type_from_api_spec,
TensorFlowPredictorType,
TensorFlowNeuronPredictorType,
ONNXPredictorType,
PythonPredictorType,
)

ASYNC_PYTHON_PREDICTOR_VALIDATION = {
"required": [
Expand All @@ -43,21 +53,56 @@
],
}

ASYNC_TENSORFLOW_PREDICTOR_VALIDATION = {
"required": [
{
"name": "__init__",
"required_args": ["self", "tensorflow_client", "config"],
"optional_args": ["metrics_client"],
},
{
"name": "predict",
"required_args": ["self"],
"optional_args": ["payload", "request_id"],
},
],
}

ASYNC_ONNX_PREDICTOR_VALIDATION = {
"required": [
{
"name": "__init__",
"required_args": ["self", "onnx_client", "config"],
"optional_args": ["metrics_client"],
},
{
"name": "predict",
"required_args": ["self"],
"optional_args": ["payload", "request_id"],
},
],
}


class AsyncAPI:
def __init__(
self,
api_spec: Dict[str, Any],
storage: S3,
storage_path: str,
model_dir: str,
statsd_host: str,
statsd_port: int,
lock_dir: str = "/run/cron",
):
self.api_spec = api_spec
self.storage = storage
self.storage_path = storage_path
self.path = api_spec["predictor"]["path"]
self.config = api_spec["predictor"].get("config", {})
self.type = predictor_type_from_api_spec(api_spec)
self.model_dir = model_dir
self.lock_dir = lock_dir

datadog.initialize(statsd_host=statsd_host, statsd_port=statsd_port)
self.__statsd = datadog.statsd
Expand Down Expand Up @@ -116,7 +161,13 @@ def delete_payload(self, request_id: str):
key = f"{self.storage_path}/{request_id}/payload"
self.storage.delete(key)

def initialize_impl(self, project_dir: str, metrics_client: MetricsClient):
def initialize_impl(
self,
project_dir: str,
metrics_client: MetricsClient,
tf_serving_host: str = None,
tf_serving_port: str = None,
):
predictor_impl = self._get_impl(project_dir)
constructor_args = inspect.getfullargspec(predictor_impl.__init__).args
config = deepcopy(self.config)
Expand All @@ -127,6 +178,23 @@ def initialize_impl(self, project_dir: str, metrics_client: MetricsClient):
if "metrics_client" in constructor_args:
args["metrics_client"] = metrics_client

if self.type in [TensorFlowPredictorType, TensorFlowNeuronPredictorType]:
tf_serving_address = tf_serving_host + ":" + tf_serving_port
tf_client = TensorFlowClient(
tf_serving_url=tf_serving_address,
api_spec=self.api_spec,
)
tf_client.sync_models(lock_dir=self.lock_dir)
args["tensorflow_client"] = tf_client
elif self.type == ONNXPredictorType:
models = ModelsHolder(self.type, self.model_dir)
onnx_client = ONNXClient(
api_spec=self.api_spec,
models=models,
model_dir=self.model_dir,
)
args["onnx_client"] = onnx_client

try:
predictor = predictor_impl(**args)
except Exception as e:
Expand All @@ -135,16 +203,28 @@ def initialize_impl(self, project_dir: str, metrics_client: MetricsClient):
return predictor

def _get_impl(self, project_dir: str):
if self.type in [TensorFlowPredictorType, TensorFlowNeuronPredictorType]:
target_class_name = "TensorFlowPredictor"
validations = ASYNC_TENSORFLOW_PREDICTOR_VALIDATION
elif self.type == ONNXPredictorType:
target_class_name = "ONNXPredictor"
validations = ASYNC_ONNX_PREDICTOR_VALIDATION
elif self.type == PythonPredictorType:
target_class_name = "PythonPredictor"
validations = ASYNC_PYTHON_PREDICTOR_VALIDATION
else:
raise CortexException(f"invalid predictor type: {self.type}")

try:
impl = self._read_impl(
"cortex_async_predictor", os.path.join(project_dir, self.path), "PythonPredictor"
"cortex_async_predictor", os.path.join(project_dir, self.path), target_class_name
)
except CortexException as e:
e.wrap("error in " + self.path)
raise

try:
self._validate_impl(impl)
validate_class_impl(impl, validations)
except CortexException as e:
e.wrap("error in " + self.path)
raise
Expand Down Expand Up @@ -182,7 +262,3 @@ def _read_impl(module_name: str, impl_path: str, target_class_name):
raise UserException(f"{target_class_name} class is not defined")

return predictor_class

@staticmethod
def _validate_impl(impl):
return validate_class_impl(impl, ASYNC_PYTHON_PREDICTOR_VALIDATION)
2 changes: 1 addition & 1 deletion pkg/cortex/serve/cortex_internal/lib/api/predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def initialize_client(
self.models_tree,
)
if not self.caching_enabled:
cron = TFSAPIServingThreadUpdater(interval=5.0, client=client._client)
cron = TFSAPIServingThreadUpdater(interval=5.0, client=client)
cron.start()

if self.type == ONNXPredictorType:
Expand Down
5 changes: 2 additions & 3 deletions pkg/cortex/serve/cortex_internal/lib/client/onnx.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import datetime
import os
import threading as td
import multiprocessing as mp
from typing import Any, Tuple, Optional

try:
Expand Down Expand Up @@ -55,7 +54,7 @@ def __init__(
api_spec: dict,
models: ModelsHolder,
model_dir: str,
models_tree: Optional[ModelsTree],
models_tree: Optional[ModelsTree] = None,
lock_dir: Optional[str] = "/run/cron",
):
"""
Expand Down
Loading