From 20a8d9e36bab6b07c8504083a045016b3b2e706e Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Fri, 5 Feb 2021 04:28:00 +0200 Subject: [PATCH 1/4] Improve error logging for Task/Batch APIs --- pkg/cortex/serve/start/batch.py | 21 +++++++++++++++------ pkg/cortex/serve/start/task.py | 7 ++++++- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/pkg/cortex/serve/start/batch.py b/pkg/cortex/serve/start/batch.py index 958dbcda51..5a49400946 100644 --- a/pkg/cortex/serve/start/batch.py +++ b/pkg/cortex/serve/start/batch.py @@ -15,6 +15,7 @@ import inspect import json import os +import sys import pathlib import threading import time @@ -310,11 +311,15 @@ def start(): storage, api_spec = get_spec(provider, api_spec_path, cache_dir, region) job_spec = get_job_spec(storage, cache_dir, job_spec_path) - client = api.predictor.initialize_client( - tf_serving_host=tf_serving_host, tf_serving_port=tf_serving_port - ) - logger.info("loading the predictor from {}".format(api.predictor.path)) - predictor_impl = api.predictor.initialize_impl(project_dir, client, job_spec) + try: + client = api.predictor.initialize_client( + tf_serving_host=tf_serving_host, tf_serving_port=tf_serving_port + ) + logger.info("loading the predictor from {}".format(api.predictor.path)) + predictor_impl = api.predictor.initialize_impl(project_dir, client, job_spec) + except: + logger.error(f"failed to start job {job_spec['job_id']}", exc_info=True) + sys.exit(1) local_cache["api_spec"] = api local_cache["provider"] = provider @@ -326,7 +331,11 @@ def start(): open("/mnt/workspace/api_readiness.txt", "a").close() logger.info("polling for batches...") - sqs_loop() + try: + sqs_loop() + except: + logger.error(f"failed to run job {job_spec['job_id']}", exc_info=True) + sys.exit(1) if __name__ == "__main__": diff --git a/pkg/cortex/serve/start/task.py b/pkg/cortex/serve/start/task.py index 7eab8b4dcb..ed4c935643 100644 --- a/pkg/cortex/serve/start/task.py +++ b/pkg/cortex/serve/start/task.py @@ -14,6 +14,7 @@ import json import os +import sys from copy import deepcopy from cortex_internal.lib import util @@ -45,7 +46,11 @@ def start(): if task_spec is not None and task_spec.get("config") is not None: util.merge_dicts_in_place_overwrite(config, task_spec["config"]) - callable_fn(config) + try: + callable_fn(config) + except: + logger.error(f"failed to run task {task_spec['job_id']}", exc_info=True) + sys.exit(1) if __name__ == "__main__": From ae26c88a7a24bee81f777db7b6cdd64006170f68 Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Fri, 5 Feb 2021 17:16:50 +0200 Subject: [PATCH 2/4] Address PR comments --- pkg/cortex/serve/start/batch.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/pkg/cortex/serve/start/batch.py b/pkg/cortex/serve/start/batch.py index 5a49400946..7c135c6456 100644 --- a/pkg/cortex/serve/start/batch.py +++ b/pkg/cortex/serve/start/batch.py @@ -270,8 +270,7 @@ def handle_on_job_complete(message): should_run_on_job_complete = True time.sleep(10) # verify that the queue is empty one more time except: - logger.exception("failed to handle on_job_complete") - raise + raise Exception("failed to handle on_job_complete") finally: with receipt_handle_mutex: stop_renewal.add(receipt_handle) @@ -317,8 +316,8 @@ def start(): ) logger.info("loading the predictor from {}".format(api.predictor.path)) predictor_impl = api.predictor.initialize_impl(project_dir, client, job_spec) - except: - logger.error(f"failed to start job {job_spec['job_id']}", exc_info=True) + except Exception as err: + logger.error(f"failed to start job {job_spec['job_id']}: {err.message}", exc_info=True) sys.exit(1) local_cache["api_spec"] = api @@ -333,8 +332,8 @@ def start(): logger.info("polling for batches...") try: sqs_loop() - except: - logger.error(f"failed to run job {job_spec['job_id']}", exc_info=True) + except Exception as err: + logger.error(f"failed to run job {job_spec['job_id']}: {err.message}", exc_info=True) sys.exit(1) From c57c33ddd4f6c6f45a79e1eabfc260bf21aeb6c3 Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Fri, 5 Feb 2021 19:04:36 +0200 Subject: [PATCH 3/4] Handle CortexException exceptions --- pkg/cortex/serve/start/batch.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/pkg/cortex/serve/start/batch.py b/pkg/cortex/serve/start/batch.py index 7c135c6456..80c7867124 100644 --- a/pkg/cortex/serve/start/batch.py +++ b/pkg/cortex/serve/start/batch.py @@ -28,6 +28,7 @@ from cortex_internal.lib.concurrency import LockedFile from cortex_internal.lib.storage import S3 from cortex_internal.lib.log import configure_logger +from cortex_internal.lib.exceptions import CortexException logger = configure_logger("cortex", os.environ["CORTEX_LOG_CONFIG_FILE"]) @@ -270,7 +271,7 @@ def handle_on_job_complete(message): should_run_on_job_complete = True time.sleep(10) # verify that the queue is empty one more time except: - raise Exception("failed to handle on_job_complete") + raise CortexException("failed to handle on_job_complete") finally: with receipt_handle_mutex: stop_renewal.add(receipt_handle) @@ -316,8 +317,12 @@ def start(): ) logger.info("loading the predictor from {}".format(api.predictor.path)) predictor_impl = api.predictor.initialize_impl(project_dir, client, job_spec) - except Exception as err: - logger.error(f"failed to start job {job_spec['job_id']}: {err.message}", exc_info=True) + except CortexException as err: + err.wrap(f"failed to start job {job_spec['job_id']}") + logger.error(str(err), exc_info=True) + sys.exit(1) + except: + logger.error(f"failed to start job {job_spec['job_id']}", exc_info=True) sys.exit(1) local_cache["api_spec"] = api @@ -332,8 +337,12 @@ def start(): logger.info("polling for batches...") try: sqs_loop() - except Exception as err: - logger.error(f"failed to run job {job_spec['job_id']}: {err.message}", exc_info=True) + except CortexException as err: + err.wrap(f"failed to run job {job_spec['job_id']}") + logger.error(str(err), exc_info=True) + sys.exit(1) + except: + logger.error(f"failed to run job {job_spec['job_id']}", exc_info=True) sys.exit(1) From 295c9ab72003a226f4eabcda93755881921ff28e Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Thu, 11 Feb 2021 01:19:51 +0200 Subject: [PATCH 4/4] Update batch.py --- pkg/cortex/serve/start/batch.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/cortex/serve/start/batch.py b/pkg/cortex/serve/start/batch.py index 80c7867124..d0bcc11450 100644 --- a/pkg/cortex/serve/start/batch.py +++ b/pkg/cortex/serve/start/batch.py @@ -270,8 +270,8 @@ def handle_on_job_complete(message): break should_run_on_job_complete = True time.sleep(10) # verify that the queue is empty one more time - except: - raise CortexException("failed to handle on_job_complete") + except Exception as err: + raise CortexException("failed to handle on_job_complete") from err finally: with receipt_handle_mutex: stop_renewal.add(receipt_handle)