-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Make background updates controllable via a plugin #11306
Changes from 20 commits
4a1a832
957da6f
0c3ba88
0ace9f8
dccddf1
c7f1498
c77bad8
dddfdca
3dcda89
4a1d77e
31a4897
d89cadd
df863fb
66aae92
f5d551a
82e880e
99ef30c
26a61b4
1a99abe
08bb9b1
8c7ec0f
589d8ea
e05809d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Add plugin support for controlling database background updates. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,68 @@ | ||
| # Background update controller callbacks | ||
|
|
||
| Background update controller callbacks allow module developers to control (e.g. rate-limit) | ||
| how database background updates are run. A database background update is an operation | ||
| Synapse runs on its database in the background after it starts. It's usually used to run | ||
| database operations that would take too long if they were run at the same time as schema | ||
| updates (which are run on startup) and delay Synapse's startup too much: populating a | ||
| table with a big amount of data, adding an index on a big table, etc. | ||
|
|
||
| Background update controller callbacks can be registered using the module API's | ||
| `register_background_update_controller_callbacks` method. Only the first module (in order | ||
| of appearance in Synapse's configuration file) calling this method can register background | ||
| update controller callbacks, subsequent calls are ignored. | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does the plugin author get to know if their callback was ignored?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not currently. |
||
| The available background update controller callbacks are: | ||
|
|
||
| ### `on_update` | ||
|
|
||
| _First introduced in Synapse v1.48.0_ | ||
babolivier marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| ```python | ||
| def on_update(update_name: str, database_name: str, one_shot: bool) -> AsyncContextManager[int] | ||
| ``` | ||
|
|
||
| Called when about to do an iteration of a background update. The module is given the name | ||
| of the update, the name of the database, and a flag to indicate whether the background | ||
| update will happen in one go and may take a long time (e.g. creating indices). If this last | ||
| argument is set to `False`, the update will be run in batches. | ||
|
|
||
| The module must return an async context manager that returns the desired duration of the | ||
| iteration, in milliseconds, and will be exited when the iteration completes. Note that the | ||
| duration returned by the context manager is a target, and an iteration may take | ||
| substantially longer or shorter. If the `one_shot` flag is set to `True`, the duration | ||
| returned is ignored. | ||
babolivier marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| __Note__: Unlike most module callbacks in Synapse, this one is _synchronous_. This is | ||
| because asynchronous operations are expected to be run by the async context manager. | ||
|
|
||
| This callback is required when registering any other background update controller callback. | ||
|
|
||
| ### `default_batch_size` | ||
|
|
||
| _First introduced in Synapse v1.48.0_ | ||
babolivier marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| ```python | ||
| async def default_batch_size(update_name: str, database_name: str) -> int | ||
| ``` | ||
|
|
||
| Called before the first iteration of a background update, with the name of the update and | ||
| of the database. The module must return the number of elements to process in this first | ||
| iteration. | ||
|
|
||
| If this callback is not defined, Synapse will use a default value of 100. | ||
|
|
||
| ### `min_batch_size` | ||
|
|
||
| _First introduced in Synapse v1.48.0_ | ||
babolivier marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| ```python | ||
| async def min_batch_size(update_name: str, database_name: str) -> int | ||
| ``` | ||
|
|
||
| Called before running a new batch for a background update, with the name of the update and | ||
| of the database. The module must return the minimum number of elements to process in this | ||
| iteration. This number must be greater than 0, and is used to ensure that progress is | ||
| always made. | ||
babolivier marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| If this callback is not defined, Synapse will use a default value of 100. | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,7 @@ | |
| List, | ||
| Optional, | ||
| Tuple, | ||
| TypeVar, | ||
| Union, | ||
| ) | ||
|
|
||
|
|
@@ -81,10 +82,19 @@ | |
| ) | ||
| from synapse.http.servlet import parse_json_object_from_request | ||
| from synapse.http.site import SynapseRequest | ||
| from synapse.logging.context import make_deferred_yieldable, run_in_background | ||
| from synapse.logging.context import ( | ||
| defer_to_thread, | ||
| make_deferred_yieldable, | ||
| run_in_background, | ||
| ) | ||
| from synapse.metrics.background_process_metrics import run_as_background_process | ||
| from synapse.rest.client.login import LoginResponse | ||
| from synapse.storage import DataStore | ||
| from synapse.storage.background_updates import ( | ||
| DEFAULT_BATCH_SIZE_CALLBACK, | ||
| MIN_BATCH_SIZE_CALLBACK, | ||
| ON_UPDATE_CALLBACK, | ||
| ) | ||
| from synapse.storage.database import DatabasePool, LoggingTransaction | ||
| from synapse.storage.databases.main.roommember import ProfileInfo | ||
| from synapse.storage.state import StateFilter | ||
|
|
@@ -104,6 +114,8 @@ | |
| from synapse.app.generic_worker import GenericWorkerSlavedStore | ||
| from synapse.server import HomeServer | ||
|
|
||
| TV = TypeVar("TV") | ||
|
|
||
| """ | ||
| This package defines the 'stable' API which can be used by extension modules which | ||
| are loaded into Synapse. | ||
|
|
@@ -307,6 +319,24 @@ def register_password_auth_provider_callbacks( | |
| auth_checkers=auth_checkers, | ||
| ) | ||
|
|
||
| def register_background_update_controller_callbacks( | ||
| self, | ||
| on_update: ON_UPDATE_CALLBACK, | ||
| default_batch_size: Optional[DEFAULT_BATCH_SIZE_CALLBACK] = None, | ||
| min_batch_size: Optional[MIN_BATCH_SIZE_CALLBACK] = None, | ||
| ) -> None: | ||
| """Registers background update controller callbacks. | ||
|
|
||
| Added in Synapse v1.49.0. | ||
| """ | ||
|
|
||
| for db in self._hs.get_datastores().databases: | ||
| db.updates.register_update_controller_callbacks( | ||
| on_update=on_update, | ||
| default_batch_size=default_batch_size, | ||
| min_batch_size=min_batch_size, | ||
| ) | ||
|
|
||
| def register_web_resource(self, path: str, resource: Resource): | ||
| """Registers a web resource to be served at the given path. | ||
|
|
||
|
|
@@ -970,6 +1000,11 @@ def looping_background_call( | |
| f, | ||
| ) | ||
|
|
||
| async def sleep(self, seconds: float) -> None: | ||
| """Sleeps for the given number of seconds.""" | ||
|
|
||
| await self._clock.sleep(seconds) | ||
|
|
||
| async def send_mail( | ||
| self, | ||
| recipient: str, | ||
|
|
@@ -1124,6 +1159,26 @@ async def get_room_state( | |
|
|
||
| return {key: state_events[event_id] for key, event_id in state_ids.items()} | ||
|
|
||
| async def defer_to_thread( | ||
| self, | ||
| f: Callable[..., TV], | ||
| *args: Any, | ||
| **kwargs: Any, | ||
| ) -> TV: | ||
| """Runs the given function in a separate thread from Synapse's thread pool. | ||
|
|
||
| Added in Synapse v1.49.0. | ||
|
|
||
| Args: | ||
| f: The function to run. | ||
| args: The function's arguments. | ||
| kwargs: The function's keyword arguments. | ||
|
|
||
| Returns: | ||
| The return value of the function once ran in a thread. | ||
| """ | ||
| return await defer_to_thread(self._hs.get_reactor(), f, *args, **kwargs) | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While not directly related with the background update work in this PR it's going to be used by https://gitlab.matrix.org/new-vector/ems-synapse-background-update-controller and it's a small change so I thought I'd include it in here. I can appreciate if people think it's too much scope-creeping, and would prefer me to open a separate PR for it. |
||
|
|
||
| class PublicRoomListManager: | ||
| """Contains methods for adding to, removing from and querying whether a room | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.