From 77e0d52e838d75d0024e88925732a65c3aa89dd1 Mon Sep 17 00:00:00 2001 From: Eric Kalosa-Kenyon Date: Tue, 14 Dec 2021 16:08:55 -0500 Subject: [PATCH 1/7] made servo issue a startup event when a run gets cancelled; made the PrometheusConnector.startup() idempotent --- servo/connectors/prometheus.py | 45 ++++++----- servo/runner.py | 140 +++++++++++++++++++-------------- 2 files changed, 106 insertions(+), 79 deletions(-) diff --git a/servo/connectors/prometheus.py b/servo/connectors/prometheus.py index 388a5fa06..096386949 100644 --- a/servo/connectors/prometheus.py +++ b/servo/connectors/prometheus.py @@ -778,26 +778,31 @@ async def startup(self) -> None: logger = servo.logger.bind(component=f"{self.name} -> {CHANNEL}") logger.info(f"Streaming Prometheus metrics every {streaming_interval}") - @self.publish(CHANNEL, every=streaming_interval) - async def _publish_metrics(publisher: servo.pubsub.Publisher) -> None: - report = [] - client = Client(base_url=self.config.base_url) - responses = await asyncio.gather( - *list(map(client.query, self.config.metrics)), - return_exceptions=True - ) - for response in responses: - if isinstance(response, Exception): - logger.error(f"failed querying Prometheus for metrics: {response}") - continue - - if response.data: - # NOTE: Instant queries return a single vector - timestamp, value = response.data[0].value - report.append((response.metric.name, timestamp.isoformat(), value)) - - await publisher(servo.pubsub.Message(json=report)) - logger.debug(f"Published {len(report)} metrics.") + try: + @self.publish(CHANNEL, every=streaming_interval) + async def _publish_metrics(publisher: servo.pubsub.Publisher) -> None: + report = [] + client = Client(base_url=self.config.base_url) + responses = await asyncio.gather( + *list(map(client.query, self.config.metrics)), + return_exceptions=True + ) + for response in responses: + if isinstance(response, Exception): + logger.error(f"failed querying Prometheus for metrics: {response}") + continue + if response.data: + # NOTE: Instant queries return a single vector + timestamp, value = response.data[0].value + report.append((response.metric.name, timestamp.isoformat(), value)) + await publisher(servo.pubsub.Message(json=report)) + logger.debug(f"Published {len(report)} metrics.") + except KeyError as e: + # Handling this KeyError makes self.startup() idempotent. + # This KeyError will be raised when self.startup() is executed more than once. The + # exception is raised from the registration of the above function as a publisher - + # the KeyError indicates that a publisher with the same name already exists. + self.logger.debug(f"{type(e)}: {str(e)})") @servo.on_event() async def check( diff --git a/servo/runner.py b/servo/runner.py index fa3695b6f..11fc6518b 100644 --- a/servo/runner.py +++ b/servo/runner.py @@ -1,5 +1,6 @@ from __future__ import annotations +import uuid import asyncio import functools import os @@ -123,77 +124,84 @@ async def exec_command(self) -> servo.api.Status: self.logger.info(f"What's Next? => {cmd_response.command}") self.logger.trace(devtools.pformat(cmd_response)) - if cmd_response.command == servo.api.Commands.describe: - description = await self.describe(Control(**cmd_response.param.get("control", {}))) - self.logger.success( - f"Described: {len(description.components)} components, {len(description.metrics)} metrics" - ) - self.logger.debug(devtools.pformat(description)) - - status = servo.api.Status.ok(descriptor=description.__opsani_repr__()) - return await self._post_event(servo.api.Events.describe, status.dict()) - - elif cmd_response.command == servo.api.Commands.measure: - try: - measurement = await self.measure(cmd_response.param) + try: + if cmd_response.command == servo.api.Commands.describe: + description = await self.describe(Control(**cmd_response.param.get("control", {}))) self.logger.success( - f"Measured: {len(measurement.readings)} readings, {len(measurement.annotations)} annotations" + f"Described: {len(description.components)} components, {len(description.metrics)} metrics" ) - self.logger.trace(devtools.pformat(measurement)) - param = measurement.__opsani_repr__() - except servo.errors.EventError as error: - self.logger.error(f"Measurement failed: {error}") - param = servo.api.Status.from_error(error).dict() - self.logger.error(f"Responding with {param}") - self.logger.opt(exception=error).debug("Measure failure details") + self.logger.debug(devtools.pformat(description)) - return await self._post_event(servo.api.Events.measure, param) + status = servo.api.Status.ok(descriptor=description.__opsani_repr__()) + return await self._post_event(servo.api.Events.describe, status.dict()) - elif cmd_response.command == servo.api.Commands.adjust: - adjustments = servo.api.descriptor_to_adjustments(cmd_response.param["state"]) - control = Control(**cmd_response.param.get("control", {})) + elif cmd_response.command == servo.api.Commands.measure: + try: + measurement = await self.measure(cmd_response.param) + self.logger.success( + f"Measured: {len(measurement.readings)} readings, {len(measurement.annotations)} annotations" + ) + self.logger.trace(devtools.pformat(measurement)) + param = measurement.__opsani_repr__() + except servo.errors.EventError as error: + self.logger.error(f"Measurement failed: {error}") + param = servo.api.Status.from_error(error).dict() + self.logger.error(f"Responding with {param}") + self.logger.opt(exception=error).debug("Measure failure details") + + return await self._post_event(servo.api.Events.measure, param) + + elif cmd_response.command == servo.api.Commands.adjust: + adjustments = servo.api.descriptor_to_adjustments(cmd_response.param["state"]) + control = Control(**cmd_response.param.get("control", {})) + + try: + description = await self.adjust(adjustments, control) + status = servo.api.Status.ok(state=description.__opsani_repr__()) + + components_count = len(description.components) + settings_count = sum( + len(component.settings) for component in description.components + ) + self.logger.success( + f"Adjusted: {components_count} components, {settings_count} settings" + ) + except servo.EventError as error: + self.logger.error(f"Adjustment failed: {error}") + status = servo.api.Status.from_error(error) + self.logger.error(f"Responding with {status.dict()}") + self.logger.opt(exception=error).debug("Adjust failure details") - try: - description = await self.adjust(adjustments, control) - status = servo.api.Status.ok(state=description.__opsani_repr__()) + return await self._post_event(servo.api.Events.adjust, status.dict()) - components_count = len(description.components) - settings_count = sum( - len(component.settings) for component in description.components + elif cmd_response.command == servo.api.Commands.sleep: + # TODO: Model this + duration = Duration(cmd_response.param.get("duration", 120)) + status = servo.utilities.key_paths.value_for_key_path(cmd_response.param, "data.status", None) + reason = servo.utilities.key_paths.value_for_key_path( + cmd_response.param, "data.reason", "unknown reason" ) - self.logger.success( - f"Adjusted: {components_count} components, {settings_count} settings" - ) - except servo.EventError as error: - self.logger.error(f"Adjustment failed: {error}") - status = servo.api.Status.from_error(error) - self.logger.error(f"Responding with {status.dict()}") - self.logger.opt(exception=error).debug("Adjust failure details") - - return await self._post_event(servo.api.Events.adjust, status.dict()) - - elif cmd_response.command == servo.api.Commands.sleep: - # TODO: Model this - duration = Duration(cmd_response.param.get("duration", 120)) - status = servo.utilities.key_paths.value_for_key_path(cmd_response.param, "data.status", None) - reason = servo.utilities.key_paths.value_for_key_path( - cmd_response.param, "data.reason", "unknown reason" - ) - msg = f"{status}: {reason}" if status else f"{reason}" - self.logger.info(f"Sleeping for {duration} ({msg}).") - await asyncio.sleep(duration.total_seconds()) + msg = f"{status}: {reason}" if status else f"{reason}" + self.logger.info(f"Sleeping for {duration} ({msg}).") + await asyncio.sleep(duration.total_seconds()) - # Return a status so we have a simple API contract - return servo.api.Status(status="ok", message=msg) - else: - raise ValueError(f"Unknown command '{cmd_response.command.value}'") + # Return a status so we have a simple API contract + return servo.api.Status(status="ok", message=msg) + else: + raise ValueError(f"Unknown command '{cmd_response.command.value}'") + finally: + self.logger.debug("at tail of runner.exec_command") # Main run loop for processing commands from the optimizer + # NOTE this is not an asyncio.loop, just a standard main() function async def main_loop(self) -> None: # FIXME: We have seen exceptions from using `with self.servo.current()` crossing contexts + self.logger.debug("setting servo") _set_current_servo(self.servo) + self.logger.debug("set servo") while self._running: + self.logger.debug("still runner._running") try: if self.interactive: if not typer.confirm("Poll for next command?"): @@ -201,7 +209,9 @@ async def main_loop(self) -> None: await asyncio.sleep(60) continue + self.logger.debug("awaiting exec_command") status = await self.exec_command() + self.logger.debug("awaited exec_command") if status.status == servo.api.OptimizerStatuses.unexpected_event: self.logger.warning( f"server reported unexpected event: {status.reason}" @@ -219,8 +229,13 @@ async def main_loop(self) -> None: raise error def run_main_loop(self) -> None: - if self._main_loop_task: + if self._main_loop_task: # i.e. if this isn't the first time the run_main_loop() has been called self._main_loop_task.cancel() + self.logger.debug("old main_loop_task cancelled") + if self.servo is not None: + loop = asyncio.get_event_loop() + loop.create_task(self.servo.dispatch_event(servo.Events.startup)) + self.logger.debug("startup Event dispatched") def _reraise_if_necessary(task: asyncio.Task) -> None: try: @@ -231,8 +246,12 @@ def _reraise_if_necessary(task: asyncio.Task) -> None: self.logger.opt(exception=error).trace(f"Exception raised by task {task}") raise error # Ensure that we surface the error for handling - self._main_loop_task = asyncio.create_task(self.main_loop(), name=f"main loop for servo {self.optimizer.id}") + _uid = str(uuid.uuid4())[-6:] + main_loop_name = f"main loop for servo {self.optimizer.id} {_uid}" + self.logger.debug(f"creating new main loop: {main_loop_name}") + self._main_loop_task = asyncio.create_task(self.main_loop(), name=main_loop_name) self._main_loop_task.add_done_callback(_reraise_if_necessary) + self.logger.debug(f"created new main loop: {main_loop_name}") async def run(self, *, poll: bool = True) -> None: self._running = True @@ -304,6 +323,7 @@ class Config: arbitrary_types_allowed = True def __init__(self, assembly: servo.Assembly, **kwargs) -> None: + # This allows to be passed as a positional argument super().__init__(assembly=assembly, **kwargs) def _runner_for_servo(self, servo: servo.Servo) -> ServoRunner: @@ -371,9 +391,12 @@ async def handle_progress_exception(progress: Dict[str, Any], error: Exception) t for t in asyncio.all_tasks() if t is not asyncio.current_task() ] self.logger.info(f"Cancelling {len(tasks)} outstanding tasks") + [self.logger.debug(f"\t{task.get_name()}") for task in tasks] [task.cancel() for task in tasks] await asyncio.gather(*tasks, return_exceptions=True) + self.logger.trace("Cancelled tasks:") + [self.logger.trace(f"\t{task.get_name()} {'cancelled' if task.cancelled() else 'not cancelled'}") for task in tasks] # Restart a fresh main loop if poll: @@ -397,7 +420,6 @@ async def handle_progress_exception(progress: Dict[str, Any], error: Exception) servo_runner = ServoRunner(servo_, interactive=interactive) loop.create_task(servo_runner.run(poll=poll)) self.runners.append(servo_runner) - loop.run_forever() finally: From 471d82b07c6e3c5db138707a38bbdf620f89c5f6 Mon Sep 17 00:00:00 2001 From: Eric Kalosa-Kenyon Date: Tue, 14 Dec 2021 16:20:54 -0500 Subject: [PATCH 2/7] using targeted conditional rather than relying on an exception that could have multiple sources --- servo/connectors/prometheus.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/servo/connectors/prometheus.py b/servo/connectors/prometheus.py index 096386949..e87859969 100644 --- a/servo/connectors/prometheus.py +++ b/servo/connectors/prometheus.py @@ -777,8 +777,7 @@ async def startup(self) -> None: if streaming_interval is not None: logger = servo.logger.bind(component=f"{self.name} -> {CHANNEL}") logger.info(f"Streaming Prometheus metrics every {streaming_interval}") - - try: + if self.pubsub_exchange.get_channel(CHANNEL) is None: @self.publish(CHANNEL, every=streaming_interval) async def _publish_metrics(publisher: servo.pubsub.Publisher) -> None: report = [] @@ -797,12 +796,6 @@ async def _publish_metrics(publisher: servo.pubsub.Publisher) -> None: report.append((response.metric.name, timestamp.isoformat(), value)) await publisher(servo.pubsub.Message(json=report)) logger.debug(f"Published {len(report)} metrics.") - except KeyError as e: - # Handling this KeyError makes self.startup() idempotent. - # This KeyError will be raised when self.startup() is executed more than once. The - # exception is raised from the registration of the above function as a publisher - - # the KeyError indicates that a publisher with the same name already exists. - self.logger.debug(f"{type(e)}: {str(e)})") @servo.on_event() async def check( From e3b7b360a5e89c8bce89ad640cbd0f054ac860fd Mon Sep 17 00:00:00 2001 From: Eric Kalosa-Kenyon Date: Tue, 14 Dec 2021 16:22:55 -0500 Subject: [PATCH 3/7] removed a try-finally debugging block --- servo/runner.py | 115 +++++++++++++++++++++++------------------------- 1 file changed, 56 insertions(+), 59 deletions(-) diff --git a/servo/runner.py b/servo/runner.py index 11fc6518b..471ffc18c 100644 --- a/servo/runner.py +++ b/servo/runner.py @@ -124,73 +124,70 @@ async def exec_command(self) -> servo.api.Status: self.logger.info(f"What's Next? => {cmd_response.command}") self.logger.trace(devtools.pformat(cmd_response)) - try: - if cmd_response.command == servo.api.Commands.describe: - description = await self.describe(Control(**cmd_response.param.get("control", {}))) + if cmd_response.command == servo.api.Commands.describe: + description = await self.describe(Control(**cmd_response.param.get("control", {}))) + self.logger.success( + f"Described: {len(description.components)} components, {len(description.metrics)} metrics" + ) + self.logger.debug(devtools.pformat(description)) + + status = servo.api.Status.ok(descriptor=description.__opsani_repr__()) + return await self._post_event(servo.api.Events.describe, status.dict()) + + elif cmd_response.command == servo.api.Commands.measure: + try: + measurement = await self.measure(cmd_response.param) self.logger.success( - f"Described: {len(description.components)} components, {len(description.metrics)} metrics" + f"Measured: {len(measurement.readings)} readings, {len(measurement.annotations)} annotations" ) - self.logger.debug(devtools.pformat(description)) + self.logger.trace(devtools.pformat(measurement)) + param = measurement.__opsani_repr__() + except servo.errors.EventError as error: + self.logger.error(f"Measurement failed: {error}") + param = servo.api.Status.from_error(error).dict() + self.logger.error(f"Responding with {param}") + self.logger.opt(exception=error).debug("Measure failure details") - status = servo.api.Status.ok(descriptor=description.__opsani_repr__()) - return await self._post_event(servo.api.Events.describe, status.dict()) + return await self._post_event(servo.api.Events.measure, param) - elif cmd_response.command == servo.api.Commands.measure: - try: - measurement = await self.measure(cmd_response.param) - self.logger.success( - f"Measured: {len(measurement.readings)} readings, {len(measurement.annotations)} annotations" - ) - self.logger.trace(devtools.pformat(measurement)) - param = measurement.__opsani_repr__() - except servo.errors.EventError as error: - self.logger.error(f"Measurement failed: {error}") - param = servo.api.Status.from_error(error).dict() - self.logger.error(f"Responding with {param}") - self.logger.opt(exception=error).debug("Measure failure details") - - return await self._post_event(servo.api.Events.measure, param) - - elif cmd_response.command == servo.api.Commands.adjust: - adjustments = servo.api.descriptor_to_adjustments(cmd_response.param["state"]) - control = Control(**cmd_response.param.get("control", {})) - - try: - description = await self.adjust(adjustments, control) - status = servo.api.Status.ok(state=description.__opsani_repr__()) - - components_count = len(description.components) - settings_count = sum( - len(component.settings) for component in description.components - ) - self.logger.success( - f"Adjusted: {components_count} components, {settings_count} settings" - ) - except servo.EventError as error: - self.logger.error(f"Adjustment failed: {error}") - status = servo.api.Status.from_error(error) - self.logger.error(f"Responding with {status.dict()}") - self.logger.opt(exception=error).debug("Adjust failure details") + elif cmd_response.command == servo.api.Commands.adjust: + adjustments = servo.api.descriptor_to_adjustments(cmd_response.param["state"]) + control = Control(**cmd_response.param.get("control", {})) - return await self._post_event(servo.api.Events.adjust, status.dict()) + try: + description = await self.adjust(adjustments, control) + status = servo.api.Status.ok(state=description.__opsani_repr__()) - elif cmd_response.command == servo.api.Commands.sleep: - # TODO: Model this - duration = Duration(cmd_response.param.get("duration", 120)) - status = servo.utilities.key_paths.value_for_key_path(cmd_response.param, "data.status", None) - reason = servo.utilities.key_paths.value_for_key_path( - cmd_response.param, "data.reason", "unknown reason" + components_count = len(description.components) + settings_count = sum( + len(component.settings) for component in description.components + ) + self.logger.success( + f"Adjusted: {components_count} components, {settings_count} settings" ) - msg = f"{status}: {reason}" if status else f"{reason}" - self.logger.info(f"Sleeping for {duration} ({msg}).") - await asyncio.sleep(duration.total_seconds()) + except servo.EventError as error: + self.logger.error(f"Adjustment failed: {error}") + status = servo.api.Status.from_error(error) + self.logger.error(f"Responding with {status.dict()}") + self.logger.opt(exception=error).debug("Adjust failure details") + + return await self._post_event(servo.api.Events.adjust, status.dict()) + + elif cmd_response.command == servo.api.Commands.sleep: + # TODO: Model this + duration = Duration(cmd_response.param.get("duration", 120)) + status = servo.utilities.key_paths.value_for_key_path(cmd_response.param, "data.status", None) + reason = servo.utilities.key_paths.value_for_key_path( + cmd_response.param, "data.reason", "unknown reason" + ) + msg = f"{status}: {reason}" if status else f"{reason}" + self.logger.info(f"Sleeping for {duration} ({msg}).") + await asyncio.sleep(duration.total_seconds()) - # Return a status so we have a simple API contract - return servo.api.Status(status="ok", message=msg) - else: - raise ValueError(f"Unknown command '{cmd_response.command.value}'") - finally: - self.logger.debug("at tail of runner.exec_command") + # Return a status so we have a simple API contract + return servo.api.Status(status="ok", message=msg) + else: + raise ValueError(f"Unknown command '{cmd_response.command.value}'") # Main run loop for processing commands from the optimizer # NOTE this is not an asyncio.loop, just a standard main() function From df4103972a35b7c4a087b90ca26c051dc9ebc1a1 Mon Sep 17 00:00:00 2001 From: Eric Kalosa-Kenyon Date: Tue, 14 Dec 2021 16:29:14 -0500 Subject: [PATCH 4/7] removed and downleveled logging relevant to this feature; improved inline developer docs --- servo/runner.py | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/servo/runner.py b/servo/runner.py index 471ffc18c..4571fed18 100644 --- a/servo/runner.py +++ b/servo/runner.py @@ -193,12 +193,9 @@ async def exec_command(self) -> servo.api.Status: # NOTE this is not an asyncio.loop, just a standard main() function async def main_loop(self) -> None: # FIXME: We have seen exceptions from using `with self.servo.current()` crossing contexts - self.logger.debug("setting servo") _set_current_servo(self.servo) - self.logger.debug("set servo") while self._running: - self.logger.debug("still runner._running") try: if self.interactive: if not typer.confirm("Poll for next command?"): @@ -206,9 +203,7 @@ async def main_loop(self) -> None: await asyncio.sleep(60) continue - self.logger.debug("awaiting exec_command") status = await self.exec_command() - self.logger.debug("awaited exec_command") if status.status == servo.api.OptimizerStatuses.unexpected_event: self.logger.warning( f"server reported unexpected event: {status.reason}" @@ -226,13 +221,13 @@ async def main_loop(self) -> None: raise error def run_main_loop(self) -> None: - if self._main_loop_task: # i.e. if this isn't the first time the run_main_loop() has been called + if self._main_loop_task: self._main_loop_task.cancel() - self.logger.debug("old main_loop_task cancelled") + self.logger.trace(f"task '{self._main_loop_task.get_name}' cancelled") if self.servo is not None: loop = asyncio.get_event_loop() loop.create_task(self.servo.dispatch_event(servo.Events.startup)) - self.logger.debug("startup Event dispatched") + self.logger.trace("startup event dispatched") def _reraise_if_necessary(task: asyncio.Task) -> None: try: @@ -248,7 +243,6 @@ def _reraise_if_necessary(task: asyncio.Task) -> None: self.logger.debug(f"creating new main loop: {main_loop_name}") self._main_loop_task = asyncio.create_task(self.main_loop(), name=main_loop_name) self._main_loop_task.add_done_callback(_reraise_if_necessary) - self.logger.debug(f"created new main loop: {main_loop_name}") async def run(self, *, poll: bool = True) -> None: self._running = True @@ -320,7 +314,9 @@ class Config: arbitrary_types_allowed = True def __init__(self, assembly: servo.Assembly, **kwargs) -> None: - # This allows to be passed as a positional argument + # Defining an __init__ explicitly within a pydantic.BaseModel as we've done here allows the + # argument to be passed as a positional argument. It's entirely to facilitate the + # clarity of code in blocks that create AssemblyRunner objects. super().__init__(assembly=assembly, **kwargs) def _runner_for_servo(self, servo: servo.Servo) -> ServoRunner: @@ -388,7 +384,7 @@ async def handle_progress_exception(progress: Dict[str, Any], error: Exception) t for t in asyncio.all_tasks() if t is not asyncio.current_task() ] self.logger.info(f"Cancelling {len(tasks)} outstanding tasks") - [self.logger.debug(f"\t{task.get_name()}") for task in tasks] + [self.logger.trace(f"\t{task.get_name()}") for task in tasks] [task.cancel() for task in tasks] await asyncio.gather(*tasks, return_exceptions=True) @@ -418,7 +414,6 @@ async def handle_progress_exception(progress: Dict[str, Any], error: Exception) loop.create_task(servo_runner.run(poll=poll)) self.runners.append(servo_runner) loop.run_forever() - finally: loop.close() From 7dc69ea095d9fefeb579fdaf2e6eabe9e61789af Mon Sep 17 00:00:00 2001 From: Eric Kalosa-Kenyon Date: Tue, 14 Dec 2021 16:30:54 -0500 Subject: [PATCH 5/7] removed a conditional that isn't necessary because the servo runner's servo attribute is not typed as Optional --- servo/runner.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/servo/runner.py b/servo/runner.py index 4571fed18..9936e7234 100644 --- a/servo/runner.py +++ b/servo/runner.py @@ -224,10 +224,9 @@ def run_main_loop(self) -> None: if self._main_loop_task: self._main_loop_task.cancel() self.logger.trace(f"task '{self._main_loop_task.get_name}' cancelled") - if self.servo is not None: - loop = asyncio.get_event_loop() - loop.create_task(self.servo.dispatch_event(servo.Events.startup)) - self.logger.trace("startup event dispatched") + loop = asyncio.get_event_loop() + loop.create_task(self.servo.dispatch_event(servo.Events.startup)) + self.logger.trace("startup event dispatched") def _reraise_if_necessary(task: asyncio.Task) -> None: try: From aa2800877fc457ac0307086cf2872196c980f88b Mon Sep 17 00:00:00 2001 From: Eric Kalosa-Kenyon Date: Fri, 7 Jan 2022 16:41:03 -0500 Subject: [PATCH 6/7] removed debugging uuids from main_loop_names --- servo/runner.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/servo/runner.py b/servo/runner.py index 9936e7234..49d07896a 100644 --- a/servo/runner.py +++ b/servo/runner.py @@ -1,6 +1,5 @@ from __future__ import annotations -import uuid import asyncio import functools import os @@ -223,10 +222,8 @@ async def main_loop(self) -> None: def run_main_loop(self) -> None: if self._main_loop_task: self._main_loop_task.cancel() - self.logger.trace(f"task '{self._main_loop_task.get_name}' cancelled") loop = asyncio.get_event_loop() loop.create_task(self.servo.dispatch_event(servo.Events.startup)) - self.logger.trace("startup event dispatched") def _reraise_if_necessary(task: asyncio.Task) -> None: try: @@ -237,8 +234,7 @@ def _reraise_if_necessary(task: asyncio.Task) -> None: self.logger.opt(exception=error).trace(f"Exception raised by task {task}") raise error # Ensure that we surface the error for handling - _uid = str(uuid.uuid4())[-6:] - main_loop_name = f"main loop for servo {self.optimizer.id} {_uid}" + main_loop_name = f"main loop for servo {self.optimizer.id}" self.logger.debug(f"creating new main loop: {main_loop_name}") self._main_loop_task = asyncio.create_task(self.main_loop(), name=main_loop_name) self._main_loop_task.add_done_callback(_reraise_if_necessary) From 8d55a192c88de736051fe8cce862efb730303fd1 Mon Sep 17 00:00:00 2001 From: Eric Kalosa-Kenyon Date: Mon, 10 Jan 2022 18:34:22 -0500 Subject: [PATCH 7/7] added comment to trigger PR re-tag --- servo/connectors/prometheus.py | 1 + 1 file changed, 1 insertion(+) diff --git a/servo/connectors/prometheus.py b/servo/connectors/prometheus.py index e87859969..26368930e 100644 --- a/servo/connectors/prometheus.py +++ b/servo/connectors/prometheus.py @@ -773,6 +773,7 @@ class PrometheusConnector(servo.BaseConnector): @servo.on_event() async def startup(self) -> None: # Continuously publish a stream of metrics broadcasting every N seconds + # Should run idempotently streaming_interval = self.config.streaming_interval if streaming_interval is not None: logger = servo.logger.bind(component=f"{self.name} -> {CHANNEL}")