From 262f3f370269f24c13f7dddf16111dd2d8fefc11 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 22 Jul 2024 12:07:55 +0100 Subject: [PATCH 1/7] Speed up SS room sorting We do this by bulk fetching the latest stream ordering --- synapse/handlers/sliding_sync.py | 31 +++-- .../databases/main/event_federation.py | 5 + synapse/storage/databases/main/stream.py | 119 +++++++++++++++++- synapse/util/caches/stream_change_cache.py | 12 +- tests/util/test_stream_change_cache.py | 4 +- 5 files changed, 148 insertions(+), 23 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 423f0329d62..06d7d43f0ba 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -1217,25 +1217,11 @@ async def sort_rooms( # Assemble a map of room ID to the `stream_ordering` of the last activity that the # user should see in the room (<= `to_token`) last_activity_in_room_map: Dict[str, int] = {} + for room_id, room_for_user in sync_room_map.items(): # If they are fully-joined to the room, let's find the latest activity # at/before the `to_token`. - if room_for_user.membership == Membership.JOIN: - last_event_result = ( - await self.store.get_last_event_pos_in_room_before_stream_ordering( - room_id, to_token.room_key - ) - ) - - # If the room has no events at/before the `to_token`, this is probably a - # mistake in the code that generates the `sync_room_map` since that should - # only give us rooms that the user had membership in during the token range. - assert last_event_result is not None - - _, event_pos = last_event_result - - last_activity_in_room_map[room_id] = event_pos.stream - else: + if room_for_user.membership != Membership.JOIN: # Otherwise, if the user has left/been invited/knocked/been banned from # a room, they shouldn't see anything past that point. # @@ -1245,6 +1231,19 @@ async def sort_rooms( # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932 last_activity_in_room_map[room_id] = room_for_user.event_pos.stream + joined_room_positions = ( + await self.store.bulk_get_last_event_pos_in_room_before_stream_ordering( + [ + room_id + for room_id, room_for_user in sync_room_map.items() + if room_for_user.membership == Membership.JOIN + ], + to_token.room_key, + ) + ) + + last_activity_in_room_map.update(joined_room_positions) + return sorted( sync_room_map.values(), # Sort by the last activity (stream_ordering) in the room diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 24abab4a235..715846865b0 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1313,6 +1313,11 @@ async def get_forward_extremities_for_room_at_stream_ordering( # We want to make the cache more effective, so we clamp to the last # change before the given ordering. last_change = self._events_stream_cache.get_max_pos_of_last_change(room_id) # type: ignore[attr-defined] + if last_change is None: + # If the room isn't in the cache we know that the last change was + # somewhere before the earliest known position of the cache, so we + # can clamp to that. + last_change = self._events_stream_cache.get_earliest_known_position() # type: ignore[attr-defined] # We don't always have a full stream_to_exterm_id table, e.g. after # the upgrade that introduced it, so we make sure we never ask for a diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index e74e0d2e916..ca6125d7858 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -78,10 +78,11 @@ from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine from synapse.storage.util.id_generators import MultiWriterIdGenerator -from synapse.types import PersistedEventPosition, RoomStreamToken +from synapse.types import PersistedEventPosition, RoomStreamToken, StrCollection from synapse.util.caches.descriptors import cached from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.cancellation import cancellable +from synapse.util.iterutils import batch_iter if TYPE_CHECKING: from synapse.server import HomeServer @@ -1293,6 +1294,122 @@ def get_last_event_pos_in_room_before_stream_ordering_txn( get_last_event_pos_in_room_before_stream_ordering_txn, ) + async def bulk_get_last_event_pos_in_room_before_stream_ordering( + self, + room_ids: StrCollection, + end_token: RoomStreamToken, + ) -> Dict[str, int]: + """Bulk fetch the stream position of the latest events in the given + rooms + """ + + min_token = end_token.stream + max_token = end_token.get_max_stream_pos() + results: Dict[str, int] = {} + + # First, we check for the rooms in the stream change cache to see if we + # can just use the latetst position from it. + missing_room_ids: Set[str] = set() + for room_id in room_ids: + stream_pos = self._events_stream_cache.get_max_pos_of_last_change(room_id) + if stream_pos and stream_pos <= min_token: + results[room_id] = stream_pos + else: + missing_room_ids.add(room_id) + + # Next, we query the stream position from the DB. At first we fetch all + # positions less than the *max* stream pos in the token, then filter + # them down. We do this as a) this is a cheaper query, and b) the vast + # majority of rooms will have a latest token from before the min stream + # pos. + + def bulk_get_last_event_pos_txn( + txn: LoggingTransaction, batch_room_ids: StrCollection + ) -> Dict[str, int]: + # This query fetches the latest stream position in the rooms before + # the given max position. + clause, args = make_in_list_sql_clause( + self.database_engine, "room_id", batch_room_ids + ) + sql = f""" + SELECT room_id, ( + SELECT stream_ordering FROM events AS e + LEFT JOIN rejections USING (event_id) + WHERE e.room_id = r.room_id + AND stream_ordering <= ? + AND NOT outlier + AND rejection_reason IS NULL + ORDER BY stream_ordering DESC + LIMIT 1 + ) + FROM rooms AS r + WHERE {clause} + """ + txn.execute(sql, [max_token] + args) + return {row[0]: row[1] for row in txn} + + recheck_rooms: Set[str] = set() + for batched in batch_iter(missing_room_ids, 1000): + result = await self.db_pool.runInteraction( + "bulk_get_last_event_pos_in_room_before_stream_ordering", + bulk_get_last_event_pos_txn, + batched, + ) + + # Check that the stream position for the rooms are from before the + # minimum position of the token. If not then we need to fetch more + # rows. + for room_id, stream in result.items(): + if min_token < stream: + recheck_rooms.add(room_id) + else: + results[room_id] = stream + + if not recheck_rooms: + return results + + # For the remaining rooms we need to fetch all rows between the min and + # max stream positions in the end token, and filter out the rows that + # are after the end token. + # + # This query should be fast as the range between the min and max should + # be small. + + def bulk_get_last_event_pos_recheck_txn( + txn: LoggingTransaction, batch_room_ids: StrCollection + ) -> Dict[str, int]: + clause, args = make_in_list_sql_clause( + self.database_engine, "room_id", batch_room_ids + ) + sql = f""" + SELECT room_id, instance_name, stream_ordering + FROM events + WHERE ? < stream_ordering AND stream_ordering <= ? + AND NOT outlier + AND rejection_reason IS NULL + AND {clause} + ORDER BY stream_ordering ASC + """ + txn.execute(sql, [min_token, max_token] + args) + results: Dict[str, int] = {} + for row in txn: + room_id = row[0] + event_pos = PersistedEventPosition(row[1], row[2]) + if not event_pos.persisted_after(end_token): + results[room_id] = event_pos.stream + + return results + + for batched in batch_iter(recheck_rooms, 1000): + recheck_result = await self.db_pool.runInteraction( + "bulk_get_last_event_pos_in_room_before_stream_ordering_recheck", + bulk_get_last_event_pos_recheck_txn, + batched, + ) + results.update(recheck_result) + + return results + async def get_current_room_stream_token_for_room_id( self, room_id: str ) -> RoomStreamToken: diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 91c335f85b0..16fcb00206f 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -327,7 +327,7 @@ def _evict(self) -> None: for entity in r: self._entity_to_key.pop(entity, None) - def get_max_pos_of_last_change(self, entity: EntityType) -> int: + def get_max_pos_of_last_change(self, entity: EntityType) -> Optional[int]: """Returns an upper bound of the stream id of the last change to an entity. @@ -335,7 +335,11 @@ def get_max_pos_of_last_change(self, entity: EntityType) -> int: entity: The entity to check. Return: - The stream position of the latest change for the given entity or - the earliest known stream position if the entitiy is unknown. + The stream position of the latest change for the given entity, if + known """ - return self._entity_to_key.get(entity, self._earliest_known_stream_pos) + return self._entity_to_key.get(entity) + + def get_earliest_known_position(self) -> int: + """Returns the earliest position in the cache.""" + return self._earliest_known_stream_pos diff --git a/tests/util/test_stream_change_cache.py b/tests/util/test_stream_change_cache.py index 5d38718a509..af1199ef8a5 100644 --- a/tests/util/test_stream_change_cache.py +++ b/tests/util/test_stream_change_cache.py @@ -249,5 +249,5 @@ def test_max_pos(self) -> None: self.assertEqual(cache.get_max_pos_of_last_change("bar@baz.net"), 3) self.assertEqual(cache.get_max_pos_of_last_change("user@elsewhere.org"), 4) - # Unknown entities will return the stream start position. - self.assertEqual(cache.get_max_pos_of_last_change("not@here.website"), 1) + # Unknown entities will return None + self.assertEqual(cache.get_max_pos_of_last_change("not@here.website"), None) From 1385cea78eeda8c258ff4affae43f0cc6561ff62 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 22 Jul 2024 14:07:53 +0100 Subject: [PATCH 2/7] Newsfile --- changelog.d/17468.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/17468.misc diff --git a/changelog.d/17468.misc b/changelog.d/17468.misc new file mode 100644 index 00000000000..d9087762045 --- /dev/null +++ b/changelog.d/17468.misc @@ -0,0 +1 @@ +Speed up sorting of the room list in sliding sync. From 5112c0dae93378e0c25e3c2aedfce4d71012da8d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 23 Jul 2024 13:01:01 +0100 Subject: [PATCH 3/7] Apply suggestions from code review Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> --- synapse/storage/databases/main/stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index ca6125d7858..811d4d69a16 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -1308,7 +1308,7 @@ async def bulk_get_last_event_pos_in_room_before_stream_ordering( results: Dict[str, int] = {} # First, we check for the rooms in the stream change cache to see if we - # can just use the latetst position from it. + # can just use the latest position from it. missing_room_ids: Set[str] = set() for room_id in room_ids: stream_pos = self._events_stream_cache.get_max_pos_of_last_change(room_id) From d9d3c1ac464d49e8ec0e02246521931ff78fac89 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 23 Jul 2024 13:01:12 +0100 Subject: [PATCH 4/7] Fixup comment --- synapse/handlers/sliding_sync.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 06d7d43f0ba..7657b177b1c 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -1219,18 +1219,18 @@ async def sort_rooms( last_activity_in_room_map: Dict[str, int] = {} for room_id, room_for_user in sync_room_map.items(): - # If they are fully-joined to the room, let's find the latest activity - # at/before the `to_token`. if room_for_user.membership != Membership.JOIN: - # Otherwise, if the user has left/been invited/knocked/been banned from - # a room, they shouldn't see anything past that point. + # If the user has left/been invited/knocked/been banned from a + # room, they shouldn't see anything past that point. # - # FIXME: It's possible that people should see beyond this point in - # invited/knocked cases if for example the room has + # FIXME: It's possible that people should see beyond this point + # in invited/knocked cases if for example the room has # `invite`/`world_readable` history visibility, see # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932 last_activity_in_room_map[room_id] = room_for_user.event_pos.stream + # For fully-joined rooms, we find the latest activity at/before the + # `to_token`. joined_room_positions = ( await self.store.bulk_get_last_event_pos_in_room_before_stream_ordering( [ From c0dfe4c3bd532cef60a3e099e5ad6e37747a74b3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 23 Jul 2024 13:04:58 +0100 Subject: [PATCH 5/7] Don't shadow var --- synapse/storage/databases/main/stream.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 811d4d69a16..89d802371d6 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -1391,14 +1391,14 @@ def bulk_get_last_event_pos_recheck_txn( ORDER BY stream_ordering ASC """ txn.execute(sql, [min_token, max_token] + args) - results: Dict[str, int] = {} + txn_results: Dict[str, int] = {} for row in txn: room_id = row[0] event_pos = PersistedEventPosition(row[1], row[2]) if not event_pos.persisted_after(end_token): - results[room_id] = event_pos.stream + txn_results[room_id] = event_pos.stream - return results + return txn_results for batched in batch_iter(recheck_rooms, 1000): recheck_result = await self.db_pool.runInteraction( From 78b47c3d60e60876b1e04e15be5b2c76b69a6818 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 23 Jul 2024 13:16:34 +0100 Subject: [PATCH 6/7] Flip the comparison to be more intuitive --- synapse/storage/databases/main/stream.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 89d802371d6..01fa663b64b 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -1360,10 +1360,10 @@ def bulk_get_last_event_pos_txn( # minimum position of the token. If not then we need to fetch more # rows. for room_id, stream in result.items(): - if min_token < stream: - recheck_rooms.add(room_id) - else: + if stream <= min_token: results[room_id] = stream + else: + recheck_rooms.add(room_id) if not recheck_rooms: return results From c784b2adc11d892062cac83e10fec0d06e9befda Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 23 Jul 2024 13:23:45 +0100 Subject: [PATCH 7/7] Comment --- synapse/storage/databases/main/stream.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 01fa663b64b..b034361aec6 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -1391,6 +1391,10 @@ def bulk_get_last_event_pos_recheck_txn( ORDER BY stream_ordering ASC """ txn.execute(sql, [min_token, max_token] + args) + + # We take the max stream ordering that is less than the token. Since + # we ordered by stream ordering we just need to iterate through and + # take the last matching stream ordering. txn_results: Dict[str, int] = {} for row in txn: room_id = row[0]