Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 20 additions & 53 deletions synapse/storage/databases/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
REMOVE_DELETED_DEVICES = "remove_deleted_devices_from_device_inbox"
REMOVE_HIDDEN_DEVICES = "remove_hidden_devices_from_device_inbox"
REMOVE_DEVICES_FROM_INBOX = "remove_devices_from_device_inbox"
REMOVE_DEAD_DEVICES_FROM_INBOX = "remove_dead_devices_from_device_inbox"

def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
super().__init__(database, db_conn, hs)
Expand All @@ -615,19 +615,18 @@ def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox
)

self.db_pool.updates.register_background_update_handler(
self.REMOVE_DELETED_DEVICES,
self._remove_deleted_devices_from_device_inbox,
)

self.db_pool.updates.register_background_update_handler(
self.REMOVE_HIDDEN_DEVICES,
self._remove_hidden_devices_from_device_inbox,
# Used to be a background update that deletes all device_inboxes for deleted
# devices.
self.db_pool.updates.register_noop_background_update(
self.REMOVE_DELETED_DEVICES
)
# Used to be a background update that deletes all device_inboxes for hidden
# devices.
self.db_pool.updates.register_noop_background_update(self.REMOVE_HIDDEN_DEVICES)

self.db_pool.updates.register_background_update_handler(
self.REMOVE_DEVICES_FROM_INBOX,
self._remove_devices_from_device_inbox,
self.REMOVE_DEAD_DEVICES_FROM_INBOX,
self._remove_dead_devices_from_device_inbox,
)

async def _background_drop_index_device_inbox(self, progress, batch_size):
Expand All @@ -642,43 +641,7 @@ def reindex_txn(conn):

return 1

async def _remove_deleted_devices_from_device_inbox(
self, progress: JsonDict, batch_size: int
) -> int:
"""No-op.

Used to be a background update that deletes all device_inboxes for deleted devices.

Args:
progress: JsonDict used to store progress of this background update
batch_size: the maximum number of rows to retrieve in a single select query

Returns:
The number of deleted rows
"""
await self.db_pool.updates._end_background_update(self.REMOVE_DELETED_DEVICES)

return 0

async def _remove_hidden_devices_from_device_inbox(
self, progress: JsonDict, batch_size: int
) -> int:
"""No-op.

Used to be a background update that deletes all device_inboxes for hidden devices.

Args:
progress: JsonDict used to store progress of this background update
batch_size: the maximum number of rows to retrieve in a single select query

Returns:
The number of deleted rows
"""
await self.db_pool.updates._end_background_update(self.REMOVE_HIDDEN_DEVICES)

return 0

async def _remove_devices_from_device_inbox(
async def _remove_dead_devices_from_device_inbox(
self,
progress: JsonDict,
batch_size: int,
Expand All @@ -694,7 +657,7 @@ async def _remove_devices_from_device_inbox(
The number of rows deleted.
"""

def _remove_devices_from_device_inbox_txn(
def _remove_dead_devices_from_device_inbox_txn(
txn: LoggingTransaction,
) -> Tuple[int, bool]:

Expand All @@ -707,13 +670,17 @@ def _remove_devices_from_device_inbox_txn(
# res can't be None.
res: Tuple[Optional[int]] = txn.fetchone() # type: ignore[assignment]
if res[0] is None:
# this can only happen if the `device_inbox` table is empty, in which
# case we have no work to do.
return 0, True
else:
max_stream_id = res[0]

start = progress.get("stream_id", 0)
stop = start + batch_size

# delete rows in `device_inbox` which do *not* correspond to a known,
# unhidden device.
sql = """
DELETE FROM device_inbox
WHERE
Expand All @@ -732,7 +699,7 @@ def _remove_devices_from_device_inbox_txn(

self.db_pool.updates._background_update_progress_txn(
txn,
self.REMOVE_DEVICES_FROM_INBOX,
self.REMOVE_DEAD_DEVICES_FROM_INBOX,
{
"stream_id": stop,
"max_stream_id": max_stream_id,
Expand All @@ -743,15 +710,15 @@ def _remove_devices_from_device_inbox_txn(

num_deleted, finished = await self.db_pool.runInteraction(
"_remove_devices_from_device_inbox_txn",
_remove_devices_from_device_inbox_txn,
_remove_dead_devices_from_device_inbox_txn,
)

if finished:
await self.db_pool.updates._end_background_update(
self.REMOVE_DEVICES_FROM_INBOX,
self.REMOVE_DEAD_DEVICES_FROM_INBOX,
)

return num_deleted
return batch_size


class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@

-- Background update to clear the inboxes of hidden and deleted devices.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(6508, 'remove_devices_from_device_inbox', '{}');
(6508, 'remove_dead_devices_from_device_inbox', '{}');
4 changes: 2 additions & 2 deletions tests/storage/databases/main/test_deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def test_background_remove_deleted_devices_from_device_inbox(self):
self.store.db_pool.simple_insert(
"background_updates",
{
"update_name": "remove_devices_from_device_inbox",
"update_name": "remove_dead_devices_from_device_inbox",
"progress_json": "{}",
},
)
Expand Down Expand Up @@ -140,7 +140,7 @@ def test_background_remove_hidden_devices_from_device_inbox(self):
self.store.db_pool.simple_insert(
"background_updates",
{
"update_name": "remove_devices_from_device_inbox",
"update_name": "remove_dead_devices_from_device_inbox",
"progress_json": "{}",
},
)
Expand Down