diff --git a/servo/connectors/prometheus.py b/servo/connectors/prometheus.py index 388a5fa06..26368930e 100644 --- a/servo/connectors/prometheus.py +++ b/servo/connectors/prometheus.py @@ -773,31 +773,30 @@ class PrometheusConnector(servo.BaseConnector): @servo.on_event() async def startup(self) -> None: # Continuously publish a stream of metrics broadcasting every N seconds + # Should run idempotently streaming_interval = self.config.streaming_interval if streaming_interval is not None: logger = servo.logger.bind(component=f"{self.name} -> {CHANNEL}") logger.info(f"Streaming Prometheus metrics every {streaming_interval}") - - @self.publish(CHANNEL, every=streaming_interval) - async def _publish_metrics(publisher: servo.pubsub.Publisher) -> None: - report = [] - client = Client(base_url=self.config.base_url) - responses = await asyncio.gather( - *list(map(client.query, self.config.metrics)), - return_exceptions=True - ) - for response in responses: - if isinstance(response, Exception): - logger.error(f"failed querying Prometheus for metrics: {response}") - continue - - if response.data: - # NOTE: Instant queries return a single vector - timestamp, value = response.data[0].value - report.append((response.metric.name, timestamp.isoformat(), value)) - - await publisher(servo.pubsub.Message(json=report)) - logger.debug(f"Published {len(report)} metrics.") + if self.pubsub_exchange.get_channel(CHANNEL) is None: + @self.publish(CHANNEL, every=streaming_interval) + async def _publish_metrics(publisher: servo.pubsub.Publisher) -> None: + report = [] + client = Client(base_url=self.config.base_url) + responses = await asyncio.gather( + *list(map(client.query, self.config.metrics)), + return_exceptions=True + ) + for response in responses: + if isinstance(response, Exception): + logger.error(f"failed querying Prometheus for metrics: {response}") + continue + if response.data: + # NOTE: Instant queries return a single vector + timestamp, value = response.data[0].value + report.append((response.metric.name, timestamp.isoformat(), value)) + await publisher(servo.pubsub.Message(json=report)) + logger.debug(f"Published {len(report)} metrics.") @servo.on_event() async def check( diff --git a/servo/runner.py b/servo/runner.py index e78e688e0..863bcdbb9 100644 --- a/servo/runner.py +++ b/servo/runner.py @@ -191,6 +191,7 @@ async def exec_command(self) -> servo.api.Status: raise ValueError(f"Unknown command '{cmd_response.command.value}'") # Main run loop for processing commands from the optimizer + # NOTE this is not an asyncio.loop, just a standard main() function async def main_loop(self) -> None: # FIXME: We have seen exceptions from using `with self.servo.current()` crossing contexts _set_current_servo(self.servo) @@ -223,6 +224,8 @@ async def main_loop(self) -> None: def run_main_loop(self) -> None: if self._main_loop_task: self._main_loop_task.cancel() + loop = asyncio.get_event_loop() + loop.create_task(self.servo.dispatch_event(servo.Events.startup)) if self._diagnostics_loop_task: self._diagnostics_loop_task.cancel() @@ -236,7 +239,9 @@ def _reraise_if_necessary(task: asyncio.Task) -> None: self.logger.opt(exception=error).trace(f"Exception raised by task {task}") raise error # Ensure that we surface the error for handling - self._main_loop_task = asyncio.create_task(self.main_loop(), name=f"main loop for servo {self.optimizer.id}") + main_loop_name = f"main loop for servo {self.optimizer.id}" + self.logger.debug(f"creating new main loop: {main_loop_name}") + self._main_loop_task = asyncio.create_task(self.main_loop(), name=main_loop_name) self._main_loop_task.add_done_callback(_reraise_if_necessary) if not servo.current_servo().config.no_diagnostics: @@ -316,6 +321,9 @@ class Config: arbitrary_types_allowed = True def __init__(self, assembly: servo.Assembly, **kwargs) -> None: + # Defining an __init__ explicitly within a pydantic.BaseModel as we've done here allows the + # argument to be passed as a positional argument. It's entirely to facilitate the + # clarity of code in blocks that create AssemblyRunner objects. super().__init__(assembly=assembly, **kwargs) def _runner_for_servo(self, servo: servo.Servo) -> ServoRunner: @@ -383,9 +391,12 @@ async def handle_progress_exception(progress: Dict[str, Any], error: Exception) t for t in asyncio.all_tasks() if t is not asyncio.current_task() ] self.logger.info(f"Cancelling {len(tasks)} outstanding tasks") + [self.logger.trace(f"\t{task.get_name()}") for task in tasks] [task.cancel() for task in tasks] await asyncio.gather(*tasks, return_exceptions=True) + self.logger.trace("Cancelled tasks:") + [self.logger.trace(f"\t{task.get_name()} {'cancelled' if task.cancelled() else 'not cancelled'}") for task in tasks] # Restart a fresh main loop if poll: @@ -410,9 +421,7 @@ async def handle_progress_exception(progress: Dict[str, Any], error: Exception) servo_runner = ServoRunner(servo_, interactive=interactive) loop.create_task(servo_runner.run(poll=poll)) self.runners.append(servo_runner) - loop.run_forever() - finally: loop.close()