diff --git a/dev/versions.md b/dev/versions.md index c04bb4af42..57df5fff6b 100644 --- a/dev/versions.md +++ b/dev/versions.md @@ -103,6 +103,7 @@ Note: it's ok if example training notebooks aren't upgraded, as long as the expo * [flask](https://pypi.org/project/flask/) * [flask-api](https://pypi.org/project/flask-api/) * [waitress](https://pypi.org/project/waitress/) + * [dill](https://pypi.org/project/dill/) 1. Update the versions listed in "Pre-installed Packages" in `request-handlers.py` ## Istio diff --git a/docs/cluster/python-client.md b/docs/cluster/python-client.md new file mode 100644 index 0000000000..d8906964d3 --- /dev/null +++ b/docs/cluster/python-client.md @@ -0,0 +1,45 @@ +# Python Client + +The Python client can be used to programmatically deploy models to a Cortex Cluster. + + +``` +pip install git+https://github.com/cortexlabs/cortex.git@master#egg=cortex\&subdirectory=pkg/workloads/cortex/client +``` + +The Python client needs to be initialized with AWS credentials and an operator URL for your Cortex cluster. You can find the operator URL by running `./cortex.sh endpoints`. + +```python +from cortex import Client + +cortex = Client( + aws_access_key_id="", # AWS access key associated with the account that the cluster is running on + aws_secret_access_key="", # AWS secret key associated with the AWS access key + operator_url="" # operator URL of your cluster +) + +api_url = cortex.deploy( + deployment_name="", # deployment name (required) + api_name="", # API name (required) + model_path="", # S3 path to an exported model (required) + pre_inference=callable, # function used to prepare requests for model input + post_inference=callable, # function used to prepare model output for response + model_format="", # model format, must be "tensorflow" or "onnx" (default: "onnx" if model path ends with .onnx, "tensorflow" if model path ends with .zip or is a directory) + tf_serving_key="" # name of the signature def to use for prediction (required if your model has more than one signature def) +) +``` + +`api_url` contains the URL of the deployed API. The API accepts JSON POST requests. + +```python +import requests + +sample = { + "feature_1": 'a', + "feature_2": 'b', + "feature_3": 'c' +} + +resp = requests.post(api_url, json=sample) +resp.json() +``` diff --git a/pkg/operator/api/schema/schema.go b/pkg/operator/api/schema/schema.go index 1ded820ae0..05735d3291 100644 --- a/pkg/operator/api/schema/schema.go +++ b/pkg/operator/api/schema/schema.go @@ -24,7 +24,9 @@ import ( ) type DeployResponse struct { - Message string `json:"message"` + Message string `json:"message"` + Context *context.Context `json:"context"` + APIsBaseURL string `json:"apis_base_url"` } type DeleteResponse struct { diff --git a/pkg/operator/endpoints/deploy.go b/pkg/operator/endpoints/deploy.go index 968563c7ec..5a39bdaf9a 100644 --- a/pkg/operator/endpoints/deploy.go +++ b/pkg/operator/endpoints/deploy.go @@ -117,20 +117,29 @@ func Deploy(w http.ResponseWriter, r *http.Request) { return } + apisBaseURL, err := workloads.APIsBaseURL() + if err != nil { + RespondError(w, err) + return + } + + deployResponse := schema.DeployResponse{Context: ctx, APIsBaseURL: apisBaseURL} switch { case isUpdating && ignoreCache: - Respond(w, schema.DeployResponse{Message: ResCachedDeletedDeploymentStarted}) + deployResponse.Message = ResCachedDeletedDeploymentStarted case isUpdating && !ignoreCache: - Respond(w, schema.DeployResponse{Message: ResDeploymentUpdated}) + deployResponse.Message = ResDeploymentUpdated case !isUpdating && ignoreCache: - Respond(w, schema.DeployResponse{Message: ResCachedDeletedDeploymentStarted}) + deployResponse.Message = ResCachedDeletedDeploymentStarted case !isUpdating && !ignoreCache && existingCtx == nil: - Respond(w, schema.DeployResponse{Message: ResDeploymentStarted}) + deployResponse.Message = ResDeploymentStarted case !isUpdating && !ignoreCache && existingCtx != nil && !fullCtxMatch: - Respond(w, schema.DeployResponse{Message: ResDeploymentUpdated}) + deployResponse.Message = ResDeploymentUpdated case !isUpdating && !ignoreCache && existingCtx != nil && fullCtxMatch: - Respond(w, schema.DeployResponse{Message: ResDeploymentUpToDate}) + deployResponse.Message = ResDeploymentUpToDate default: - Respond(w, schema.DeployResponse{Message: ResDeploymentUpdated}) + deployResponse.Message = ResDeploymentUpdated } + + Respond(w, deployResponse) } diff --git a/pkg/workloads/cortex/client/cortex/__init__.py b/pkg/workloads/cortex/client/cortex/__init__.py new file mode 100644 index 0000000000..4005eec0fa --- /dev/null +++ b/pkg/workloads/cortex/client/cortex/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2019 Cortex Labs, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from cortex.client import Client diff --git a/pkg/workloads/cortex/client/cortex/client.py b/pkg/workloads/cortex/client/cortex/client.py new file mode 100644 index 0000000000..3b5058cb5a --- /dev/null +++ b/pkg/workloads/cortex/client/cortex/client.py @@ -0,0 +1,144 @@ +# Copyright 2019 Cortex Labs, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pathlib +from pathlib import Path +import os +import types +import subprocess +import sys +import shutil +import yaml +import urllib.parse +import base64 + +import dill +import requests +from requests.exceptions import HTTPError +import msgpack + + +class Client(object): + def __init__(self, aws_access_key_id, aws_secret_access_key, operator_url): + """Initialize a Client to a Cortex Operator + + Args: + aws_access_key_id (string): AWS access key associated with the account that the cluster is running on + aws_secret_access_key (string): AWS secret key associated with the AWS access key + operator_url (string): operator URL of your cluster + """ + + self.operator_url = operator_url + self.workspace = str(Path.home() / ".cortex" / "workspace") + self.aws_access_key_id = aws_access_key_id + self.aws_secret_access_key = aws_secret_access_key + self.headers = { + "CortexAPIVersion": "master", + "Authorization": "CortexAWS {}|{}".format( + self.aws_access_key_id, self.aws_secret_access_key + ), + } + + pathlib.Path(self.workspace).mkdir(parents=True, exist_ok=True) + + def deploy( + self, + deployment_name, + api_name, + model_path, + pre_inference=None, + post_inference=None, + model_format=None, + tf_serving_key=None, + ): + """Deploy an API + + Args: + deployment_name (string): deployment name + api_name (string): API name + model_path (string): S3 path to an exported model + pre_inference (function, optional): function used to prepare requests for model input + post_inference (function, optional): function used to prepare model output for response + model_format (string, optional): model format, must be "tensorflow" or "onnx" (default: "onnx" if model path ends with .onnx, "tensorflow" if model path ends with .zip or is a directory) + tf_serving_key (string, optional): name of the signature def to use for prediction (required if your model has more than one signature def) + + Returns: + string: url to the deployed API + """ + + working_dir = os.path.join(self.workspace, deployment_name) + api_working_dir = os.path.join(working_dir, api_name) + pathlib.Path(api_working_dir).mkdir(parents=True, exist_ok=True) + + api_config = {"kind": "api", "model": model_path, "name": api_name} + + if tf_serving_key is not None: + api_config["model_format"] = tf_serving_key + + if model_format is not None: + api_config["model_format"] = model_format + + if pre_inference is not None or post_inference is not None: + reqs = subprocess.check_output([sys.executable, "-m", "pip", "freeze"]) + + with open(os.path.join(api_working_dir, "requirements.txt"), "w") as f: + f.writelines(reqs.decode()) + + handlers = {} + + if pre_inference is not None: + handlers["pre_inference"] = pre_inference + + if post_inference is not None: + handlers["post_inference"] = post_inference + + with open(os.path.join(api_working_dir, "request_handler.pickle"), "wb") as f: + dill.dump(handlers, f, recurse=True) + + api_config["request_handler"] = "request_handler.pickle" + + cortex_config = [{"kind": "deployment", "name": deployment_name}, api_config] + + cortex_yaml_path = os.path.join(working_dir, "cortex.yaml") + with open(cortex_yaml_path, "w") as f: + f.write(yaml.dump(cortex_config)) + + project_zip_path = os.path.join(working_dir, "project") + shutil.make_archive(project_zip_path, "zip", api_working_dir) + project_zip_path += ".zip" + + queries = {"force": "false", "ignoreCache": "false"} + + with open(cortex_yaml_path, "rb") as config, open(project_zip_path, "rb") as project: + files = {"cortex.yaml": config, "project.zip": project} + try: + resp = requests.post( + urllib.parse.urljoin(self.operator_url, "deploy"), + params=queries, + files=files, + headers=self.headers, + verify=False, + ) + resp.raise_for_status() + resources = resp.json() + except HTTPError as err: + resp = err.response + if "error" in resp.json(): + raise Exception(resp.json()["error"]) from err + raise + + b64_encoded_context = resources["context"] + context_msgpack_bytestring = base64.b64decode(b64_encoded_context) + ctx = msgpack.loads(context_msgpack_bytestring, raw=False) + return urllib.parse.urljoin(resources["apis_base_url"], ctx["apis"][api_name]["path"]) diff --git a/pkg/workloads/cortex/client/setup.py b/pkg/workloads/cortex/client/setup.py new file mode 100644 index 0000000000..bd00510ca8 --- /dev/null +++ b/pkg/workloads/cortex/client/setup.py @@ -0,0 +1,26 @@ +# Copyright 2019 Cortex Labs, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from setuptools import setup, find_packages + +setup( + name="cortex", + version="0.8.0", + description="", + author="Cortex Labs", + author_email="dev@cortexlabs.com", + install_requires=["dill>=0.3.0", "requests>=2.20.0", "msgpack>=0.6.0"], + setup_requires=["setuptools"], + packages=find_packages(), +) diff --git a/pkg/workloads/cortex/lib/context.py b/pkg/workloads/cortex/lib/context.py index 1106b58deb..e10617fdd9 100644 --- a/pkg/workloads/cortex/lib/context.py +++ b/pkg/workloads/cortex/lib/context.py @@ -15,8 +15,10 @@ import os import imp import inspect + import boto3 import datadog +import dill from cortex import consts from cortex.lib import util @@ -108,10 +110,22 @@ def download_python_file(self, impl_key, module_name): def load_module(self, module_prefix, module_name, impl_path): full_module_name = "{}_{}".format(module_prefix, module_name) - try: - impl = imp.load_source(full_module_name, impl_path) - except Exception as e: - raise UserException("unable to load python file", str(e)) from e + + if impl_path.endswith(".pickle"): + try: + impl = imp.new_module(full_module_name) + + with open(impl_path, "rb") as pickle_file: + pickled_dict = dill.load(pickle_file) + for key in pickled_dict: + setattr(impl, key, pickled_dict[key]) + except Exception as e: + raise UserException("unable to load pickle", str(e)) from e + else: + try: + impl = imp.load_source(full_module_name, impl_path) + except Exception as e: + raise UserException("unable to load python file", str(e)) from e return impl diff --git a/pkg/workloads/cortex/lib/requirements.txt b/pkg/workloads/cortex/lib/requirements.txt index 3a721783a3..fda1ae01b3 100644 --- a/pkg/workloads/cortex/lib/requirements.txt +++ b/pkg/workloads/cortex/lib/requirements.txt @@ -4,3 +4,4 @@ numpy==1.17.2 json_tricks==3.13.2 requests==2.22.0 datadog==0.30.0 +dill==0.3.0 diff --git a/pkg/workloads/cortex/lib/util.py b/pkg/workloads/cortex/lib/util.py index e28685a1f0..3f4f6244da 100644 --- a/pkg/workloads/cortex/lib/util.py +++ b/pkg/workloads/cortex/lib/util.py @@ -23,6 +23,7 @@ import zipfile import hashlib import msgpack +import pathlib from copy import deepcopy from datetime import datetime @@ -62,13 +63,7 @@ def snake_to_camel(input, sep="_", lower=True): def mkdir_p(dir_path): - try: - os.makedirs(dir_path) - except OSError as e: - if e.errno == errno.EEXIST and os.path.isdir(dir_path): - pass - else: - raise + pathlib.Path(dir_path).mkdir(parents=True, exist_ok=True) def rm_dir(dir_path):