diff --git a/pkg/cortex/serve/start/batch.py b/pkg/cortex/serve/start/batch.py index 958dbcda51..d0bcc11450 100644 --- a/pkg/cortex/serve/start/batch.py +++ b/pkg/cortex/serve/start/batch.py @@ -15,6 +15,7 @@ import inspect import json import os +import sys import pathlib import threading import time @@ -27,6 +28,7 @@ from cortex_internal.lib.concurrency import LockedFile from cortex_internal.lib.storage import S3 from cortex_internal.lib.log import configure_logger +from cortex_internal.lib.exceptions import CortexException logger = configure_logger("cortex", os.environ["CORTEX_LOG_CONFIG_FILE"]) @@ -268,9 +270,8 @@ def handle_on_job_complete(message): break should_run_on_job_complete = True time.sleep(10) # verify that the queue is empty one more time - except: - logger.exception("failed to handle on_job_complete") - raise + except Exception as err: + raise CortexException("failed to handle on_job_complete") from err finally: with receipt_handle_mutex: stop_renewal.add(receipt_handle) @@ -310,11 +311,19 @@ def start(): storage, api_spec = get_spec(provider, api_spec_path, cache_dir, region) job_spec = get_job_spec(storage, cache_dir, job_spec_path) - client = api.predictor.initialize_client( - tf_serving_host=tf_serving_host, tf_serving_port=tf_serving_port - ) - logger.info("loading the predictor from {}".format(api.predictor.path)) - predictor_impl = api.predictor.initialize_impl(project_dir, client, job_spec) + try: + client = api.predictor.initialize_client( + tf_serving_host=tf_serving_host, tf_serving_port=tf_serving_port + ) + logger.info("loading the predictor from {}".format(api.predictor.path)) + predictor_impl = api.predictor.initialize_impl(project_dir, client, job_spec) + except CortexException as err: + err.wrap(f"failed to start job {job_spec['job_id']}") + logger.error(str(err), exc_info=True) + sys.exit(1) + except: + logger.error(f"failed to start job {job_spec['job_id']}", exc_info=True) + sys.exit(1) local_cache["api_spec"] = api local_cache["provider"] = provider @@ -326,7 +335,15 @@ def start(): open("/mnt/workspace/api_readiness.txt", "a").close() logger.info("polling for batches...") - sqs_loop() + try: + sqs_loop() + except CortexException as err: + err.wrap(f"failed to run job {job_spec['job_id']}") + logger.error(str(err), exc_info=True) + sys.exit(1) + except: + logger.error(f"failed to run job {job_spec['job_id']}", exc_info=True) + sys.exit(1) if __name__ == "__main__": diff --git a/pkg/cortex/serve/start/task.py b/pkg/cortex/serve/start/task.py index 7eab8b4dcb..ed4c935643 100644 --- a/pkg/cortex/serve/start/task.py +++ b/pkg/cortex/serve/start/task.py @@ -14,6 +14,7 @@ import json import os +import sys from copy import deepcopy from cortex_internal.lib import util @@ -45,7 +46,11 @@ def start(): if task_spec is not None and task_spec.get("config") is not None: util.merge_dicts_in_place_overwrite(config, task_spec["config"]) - callable_fn(config) + try: + callable_fn(config) + except: + logger.error(f"failed to run task {task_spec['job_id']}", exc_info=True) + sys.exit(1) if __name__ == "__main__":