Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 20 additions & 21 deletions servo/connectors/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -773,31 +773,30 @@ class PrometheusConnector(servo.BaseConnector):
@servo.on_event()
async def startup(self) -> None:
# Continuously publish a stream of metrics broadcasting every N seconds
# Should run idempotently
streaming_interval = self.config.streaming_interval
if streaming_interval is not None:
logger = servo.logger.bind(component=f"{self.name} -> {CHANNEL}")
logger.info(f"Streaming Prometheus metrics every {streaming_interval}")

@self.publish(CHANNEL, every=streaming_interval)
async def _publish_metrics(publisher: servo.pubsub.Publisher) -> None:
report = []
client = Client(base_url=self.config.base_url)
responses = await asyncio.gather(
*list(map(client.query, self.config.metrics)),
return_exceptions=True
)
for response in responses:
if isinstance(response, Exception):
logger.error(f"failed querying Prometheus for metrics: {response}")
continue

if response.data:
# NOTE: Instant queries return a single vector
timestamp, value = response.data[0].value
report.append((response.metric.name, timestamp.isoformat(), value))

await publisher(servo.pubsub.Message(json=report))
logger.debug(f"Published {len(report)} metrics.")
if self.pubsub_exchange.get_channel(CHANNEL) is None:
@self.publish(CHANNEL, every=streaming_interval)
async def _publish_metrics(publisher: servo.pubsub.Publisher) -> None:
report = []
client = Client(base_url=self.config.base_url)
responses = await asyncio.gather(
*list(map(client.query, self.config.metrics)),
return_exceptions=True
)
for response in responses:
if isinstance(response, Exception):
logger.error(f"failed querying Prometheus for metrics: {response}")
continue
if response.data:
# NOTE: Instant queries return a single vector
timestamp, value = response.data[0].value
report.append((response.metric.name, timestamp.isoformat(), value))
await publisher(servo.pubsub.Message(json=report))
logger.debug(f"Published {len(report)} metrics.")

@servo.on_event()
async def check(
Expand Down
15 changes: 12 additions & 3 deletions servo/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ async def exec_command(self) -> servo.api.Status:
raise ValueError(f"Unknown command '{cmd_response.command.value}'")

# Main run loop for processing commands from the optimizer
# NOTE this is not an asyncio.loop, just a standard main() function
async def main_loop(self) -> None:
# FIXME: We have seen exceptions from using `with self.servo.current()` crossing contexts
_set_current_servo(self.servo)
Expand Down Expand Up @@ -223,6 +224,8 @@ async def main_loop(self) -> None:
def run_main_loop(self) -> None:
if self._main_loop_task:
self._main_loop_task.cancel()
loop = asyncio.get_event_loop()
loop.create_task(self.servo.dispatch_event(servo.Events.startup))
Comment on lines 224 to +228
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def run_main_loop(self) -> None:
if self._main_loop_task:
self._main_loop_task.cancel()
loop = asyncio.get_event_loop()
loop.create_task(self.servo.dispatch_event(servo.Events.startup))
async def run_main_loop(self) -> None:
if self._main_loop_task:
self._main_loop_task.cancel()
await self.servo.startup()
self.logger.info(
f"Servo started with {len(self.servo.connectors)} active connectors [{self.optimizer.id} @ {self.optimizer.url or self.optimizer.base_url}]"
)

I agree we should update the startup/shutdown lifecycle given the implementation of cancel responses. However, we need to update a few more places for the sake of completeness:

  • We're moving this startup into run_main_loop() (delete the old one):
    await self.servo.startup()
  • await run_main_loop now that its async:
  • shutdown the servo during cancellation handling:
    • servox/servo/runner.py

      Lines 373 to 404 in 6ce0cbc

      if isinstance(error, (servo.errors.UnexpectedEventError, servo.errors.EventCancelledError)):
      if isinstance(error, servo.errors.UnexpectedEventError):
      self.logger.error(
      "servo has lost synchronization with the optimizer: restarting"
      )
      elif isinstance(error, servo.errors.EventCancelledError):
      self.logger.error(
      "optimizer has cancelled operation in progress: cancelling and restarting loop"
      )
      # Post a status to resolve the operation
      operation = progress['operation']
      status = servo.api.Status.from_error(error)
      self.logger.error(f"Responding with {status.dict()}")
      runner = self._runner_for_servo(servo.current_servo())
      await runner._post_event(operation, status.dict())
      tasks = [
      t for t in asyncio.all_tasks() if t is not asyncio.current_task()
      ]
      self.logger.info(f"Cancelling {len(tasks)} outstanding tasks")
      [self.logger.trace(f"\t{task.get_name()}") for task in tasks]
      [task.cancel() for task in tasks]
      await asyncio.gather(*tasks, return_exceptions=True)
      self.logger.trace("Cancelled tasks:")
      [self.logger.trace(f"\t{task.get_name()} {'cancelled' if task.cancelled() else 'not cancelled'}") for task in tasks]
      # Restart a fresh main loop
      if poll:
      runner = self._runner_for_servo(servo.current_servo())
      runner.run_main_loop()
    • servox/servo/runner.py

      Lines 518 to 527 in 6ce0cbc

      # Shut down the servo runners, breaking active control loops
      if len(self.runners) == 1:
      self.logger.info(f"Shutting down servo...")
      else:
      self.logger.info(f"Shutting down {len(self.runners)} running servos...")
      for fut in asyncio.as_completed(list(map(lambda r: r.shutdown(reason=reason), self.runners)), timeout=30.0):
      try:
      await fut
      except Exception as error:
      self.logger.critical(f"Failed servo runner shutdown with error: {error}")


if self._diagnostics_loop_task:
self._diagnostics_loop_task.cancel()
Expand All @@ -236,7 +239,9 @@ def _reraise_if_necessary(task: asyncio.Task) -> None:
self.logger.opt(exception=error).trace(f"Exception raised by task {task}")
raise error # Ensure that we surface the error for handling

self._main_loop_task = asyncio.create_task(self.main_loop(), name=f"main loop for servo {self.optimizer.id}")
main_loop_name = f"main loop for servo {self.optimizer.id}"
self.logger.debug(f"creating new main loop: {main_loop_name}")
self._main_loop_task = asyncio.create_task(self.main_loop(), name=main_loop_name)
self._main_loop_task.add_done_callback(_reraise_if_necessary)

if not servo.current_servo().config.no_diagnostics:
Expand Down Expand Up @@ -316,6 +321,9 @@ class Config:
arbitrary_types_allowed = True

def __init__(self, assembly: servo.Assembly, **kwargs) -> None:
# Defining an __init__ explicitly within a pydantic.BaseModel as we've done here allows the
# <assembly> argument to be passed as a positional argument. It's entirely to facilitate the
# clarity of code in blocks that create AssemblyRunner objects.
super().__init__(assembly=assembly, **kwargs)

def _runner_for_servo(self, servo: servo.Servo) -> ServoRunner:
Expand Down Expand Up @@ -383,9 +391,12 @@ async def handle_progress_exception(progress: Dict[str, Any], error: Exception)
t for t in asyncio.all_tasks() if t is not asyncio.current_task()
]
self.logger.info(f"Cancelling {len(tasks)} outstanding tasks")
[self.logger.trace(f"\t{task.get_name()}") for task in tasks]
[task.cancel() for task in tasks]

await asyncio.gather(*tasks, return_exceptions=True)
self.logger.trace("Cancelled tasks:")
[self.logger.trace(f"\t{task.get_name()} {'cancelled' if task.cancelled() else 'not cancelled'}") for task in tasks]

# Restart a fresh main loop
if poll:
Expand All @@ -410,9 +421,7 @@ async def handle_progress_exception(progress: Dict[str, Any], error: Exception)
servo_runner = ServoRunner(servo_, interactive=interactive)
loop.create_task(servo_runner.run(poll=poll))
self.runners.append(servo_runner)

loop.run_forever()

finally:
loop.close()

Expand Down