-
Notifications
You must be signed in to change notification settings - Fork 411
Sliding sync: Add classes for per-connection state #17574
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
da5339d
baac6c5
0561c86
2e7672d
64310ec
79e80eb
d982efe
b0a5c0e
577370a
27b7a4a
dec5314
5b6755a
7f5bccc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Refactor per-connection state in experimental sliding sync handler. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,8 @@ | |
| # | ||
| import enum | ||
| import logging | ||
| import typing | ||
| from collections import ChainMap | ||
| from enum import Enum | ||
| from itertools import chain | ||
| from typing import ( | ||
|
|
@@ -30,11 +32,13 @@ | |
| List, | ||
| Literal, | ||
| Mapping, | ||
| MutableMapping, | ||
| Optional, | ||
| Sequence, | ||
| Set, | ||
| Tuple, | ||
| Union, | ||
| cast, | ||
| ) | ||
|
|
||
| import attr | ||
|
|
@@ -571,21 +575,21 @@ async def current_sync_for_user( | |
| # See https://github.com/matrix-org/matrix-doc/issues/1144 | ||
| raise NotImplementedError() | ||
|
|
||
| if from_token: | ||
| # Check that we recognize the connection position, if not tell the | ||
| # clients that they need to start again. | ||
| # | ||
| # If we don't do this and the client asks for the full range of | ||
| # rooms, we end up sending down all rooms and their state from | ||
| # scratch (which can be very slow). By expiring the connection we | ||
| # allow the client a chance to do an initial request with a smaller | ||
| # range of rooms to get them some results sooner but will end up | ||
| # taking the same amount of time (more with round-trips and | ||
| # re-processing) in the end to get everything again. | ||
| if not await self.connection_store.is_valid_token( | ||
| sync_config, from_token.connection_position | ||
| ): | ||
| raise SlidingSyncUnknownPosition() | ||
| # Get the per-connection state (if any). | ||
| # | ||
| # Raises an exception if there is a `connection_position` that we don't | ||
| # recognize. If we don't do this and the client asks for the full range | ||
| # of rooms, we end up sending down all rooms and their state from | ||
| # scratch (which can be very slow). By expiring the connection we allow | ||
| # the client a chance to do an initial request with a smaller range of | ||
| # rooms to get them some results sooner but will end up taking the same | ||
| # amount of time (more with round-trips and re-processing) in the end to | ||
| # get everything again. | ||
| previous_connection_state = ( | ||
| await self.connection_store.get_per_connection_state( | ||
| sync_config, from_token | ||
| ) | ||
| ) | ||
|
|
||
| await self.connection_store.mark_token_seen( | ||
| sync_config=sync_config, | ||
|
|
@@ -781,11 +785,7 @@ async def current_sync_for_user( | |
| # we haven't sent the room down, or we have but there are missing | ||
| # updates). | ||
| for room_id in relevant_room_map: | ||
| status = await self.connection_store.have_sent_room( | ||
| sync_config, | ||
| from_token.connection_position, | ||
| room_id, | ||
| ) | ||
| status = previous_connection_state.rooms.have_sent_room(room_id) | ||
| if ( | ||
| # The room was never sent down before so the client needs to know | ||
| # about it regardless of any updates. | ||
|
|
@@ -821,6 +821,7 @@ async def current_sync_for_user( | |
| async def handle_room(room_id: str) -> None: | ||
| room_sync_result = await self.get_room_sync_data( | ||
| sync_config=sync_config, | ||
| per_connection_state=previous_connection_state, | ||
| room_id=room_id, | ||
| room_sync_config=relevant_rooms_to_send_map[room_id], | ||
| room_membership_for_user_at_to_token=room_membership_for_user_map[ | ||
|
|
@@ -853,6 +854,8 @@ async def handle_room(room_id: str) -> None: | |
| ) | ||
|
|
||
| if has_lists or has_room_subscriptions: | ||
| new_connection_state = previous_connection_state.get_mutable() | ||
|
|
||
| # We now calculate if any rooms outside the range have had updates, | ||
| # which we are not sending down. | ||
| # | ||
|
|
@@ -882,11 +885,18 @@ async def handle_room(room_id: str) -> None: | |
| ) | ||
| unsent_room_ids = list(missing_event_map_by_room) | ||
|
|
||
| connection_position = await self.connection_store.record_rooms( | ||
| new_connection_state.rooms.record_unsent_rooms( | ||
| unsent_room_ids, from_token.stream_token | ||
| ) | ||
|
|
||
| new_connection_state.rooms.record_sent_rooms( | ||
| relevant_rooms_to_send_map.keys() | ||
| ) | ||
|
|
||
| connection_position = await self.connection_store.record_new_state( | ||
| sync_config=sync_config, | ||
| from_token=from_token, | ||
| sent_room_ids=relevant_rooms_to_send_map.keys(), | ||
| unsent_room_ids=unsent_room_ids, | ||
| per_connection_state=new_connection_state, | ||
| ) | ||
| elif from_token: | ||
| connection_position = from_token.connection_position | ||
|
|
@@ -1939,6 +1949,7 @@ async def get_current_state_at( | |
| async def get_room_sync_data( | ||
| self, | ||
| sync_config: SlidingSyncConfig, | ||
| per_connection_state: "PerConnectionState", | ||
| room_id: str, | ||
| room_sync_config: RoomSyncConfig, | ||
| room_membership_for_user_at_to_token: _RoomMembershipForUser, | ||
|
|
@@ -1986,11 +1997,7 @@ async def get_room_sync_data( | |
| from_bound = None | ||
| initial = True | ||
| if from_token and not room_membership_for_user_at_to_token.newly_joined: | ||
| room_status = await self.connection_store.have_sent_room( | ||
| sync_config=sync_config, | ||
| connection_token=from_token.connection_position, | ||
| room_id=room_id, | ||
| ) | ||
| room_status = per_connection_state.rooms.have_sent_room(room_id) | ||
| if room_status.status == HaveSentRoomFlag.LIVE: | ||
| from_bound = from_token.stream_token.room_key | ||
| initial = False | ||
|
|
@@ -3034,6 +3041,118 @@ def previously(last_token: RoomStreamToken) -> "HaveSentRoom": | |
| HAVE_SENT_ROOM_LIVE = HaveSentRoom(HaveSentRoomFlag.LIVE, None) | ||
|
|
||
|
|
||
| @attr.s(auto_attribs=True, slots=True, frozen=True) | ||
| class RoomStatusMap: | ||
| """For a given stream, e.g. events, records what we have or have not sent | ||
| down for that stream in a given room.""" | ||
|
|
||
| # `room_id` -> `HaveSentRoom` | ||
| _statuses: Mapping[str, HaveSentRoom] = attr.Factory(dict) | ||
erikjohnston marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| def have_sent_room(self, room_id: str) -> HaveSentRoom: | ||
| """Return whether we have previously sent the room down""" | ||
| return self._statuses.get(room_id, HAVE_SENT_ROOM_NEVER) | ||
|
|
||
| def get_mutable(self) -> "MutableRoomStatusMap": | ||
| """Get a mutable copy of this state.""" | ||
| return MutableRoomStatusMap( | ||
| statuses=self._statuses, | ||
| ) | ||
|
|
||
| def copy(self) -> "RoomStatusMap": | ||
| """Make a copy of the class. Useful for converting from a mutable to | ||
| immutable version.""" | ||
|
|
||
| return RoomStatusMap(statuses=dict(self._statuses)) | ||
|
|
||
|
|
||
| class MutableRoomStatusMap(RoomStatusMap): | ||
| """A mutable version of `RoomStatusMap`""" | ||
|
|
||
| _statuses: typing.ChainMap[str, HaveSentRoom] | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we worried about the memory footprint over time? For a long-running connection, this seems like it will grow and grow until we restart the server. Every sync request with updates will add another layer to this There's never a chance to garbage collect anything even once a map layer is completely superseded by the layers below with all of the keys. Seems like we should do a clean-up /consolidation pass after a certain number of layers. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah so actually when we store the mutable connection state we actually end up doing a I'm also wanting to rip all this out and replace it with DB tables once the receipts and subscription PRs land, so I'm not overly concerned about resources for this stuff. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ahh, we should add some comments in various places. Something about how the Or maybe a better word about, we only ever use the Because we There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, will try and add some comments. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have tried to add a couple of comments in 7f5bccc |
||
|
|
||
| def __init__( | ||
| self, | ||
| statuses: Mapping[str, HaveSentRoom], | ||
| ) -> None: | ||
| # ChainMap requires a mutable mapping, but we're not actually going to | ||
| # mutate it. | ||
| statuses = cast(MutableMapping, statuses) | ||
|
|
||
| super().__init__( | ||
| statuses=ChainMap({}, statuses), | ||
| ) | ||
|
|
||
| def get_updates(self) -> Mapping[str, HaveSentRoom]: | ||
| """Return only the changes that were made""" | ||
| return self._statuses.maps[0] | ||
|
|
||
| def record_sent_rooms(self, room_ids: StrCollection) -> None: | ||
| """Record that we have sent these rooms in the response""" | ||
| for room_id in room_ids: | ||
| current_status = self._statuses.get(room_id, HAVE_SENT_ROOM_NEVER) | ||
| if current_status.status == HaveSentRoomFlag.LIVE: | ||
| continue | ||
|
|
||
| self._statuses[room_id] = HAVE_SENT_ROOM_LIVE | ||
|
|
||
| def record_unsent_rooms( | ||
| self, room_ids: StrCollection, from_token: StreamToken | ||
| ) -> None: | ||
| """Record that we have not sent these rooms in the response, but there | ||
| have been updates. | ||
| """ | ||
| # Whether we add/update the entries for unsent rooms depends on the | ||
| # existing entry: | ||
| # - LIVE: We have previously sent down everything up to | ||
| # `last_room_token, so we update the entry to be `PREVIOUSLY` with | ||
| # `last_room_token`. | ||
| # - PREVIOUSLY: We have previously sent down everything up to *a* | ||
| # given token, so we don't need to update the entry. | ||
| # - NEVER: We have never previously sent down the room, and we haven't | ||
| # sent anything down this time either so we leave it as NEVER. | ||
|
|
||
| for room_id in room_ids: | ||
| current_status = self._statuses.get(room_id, HAVE_SENT_ROOM_NEVER) | ||
| if current_status.status != HaveSentRoomFlag.LIVE: | ||
| continue | ||
|
|
||
| self._statuses[room_id] = HaveSentRoom.previously(from_token.room_key) | ||
|
|
||
|
|
||
| @attr.s(auto_attribs=True) | ||
| class PerConnectionState: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm having a lot of trouble keeping all of these structures straight:
Maybe it's just a naming thing or because this is my first exposure. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, it's a lot. I don't know if maybe it'd help moving to a different file so its a bit more clearly laid out? Suggestions on naming is also more than welcome. |
||
| """The per-connection state. A snapshot of what we've sent down the connection before. | ||
|
|
||
| Currently, we track whether we've sent down various aspects of a given room before. | ||
|
|
||
| We use the `rooms` field to store the position in the events stream for each room that we've previously sent to the client before. On the next request that includes the room, we can then send only what's changed since that recorded position. | ||
|
|
||
| Same goes for the `receipts` field so we only need to send the new receipts since the last time you made a sync request. | ||
|
|
||
| Attributes: | ||
| rooms: The status of each room for the events stream. | ||
| """ | ||
|
|
||
| rooms: RoomStatusMap = attr.Factory(RoomStatusMap) | ||
|
|
||
| def get_mutable(self) -> "MutablePerConnectionState": | ||
| """Get a mutable copy of this state.""" | ||
| return MutablePerConnectionState( | ||
| rooms=self.rooms.get_mutable(), | ||
| ) | ||
|
|
||
|
|
||
| @attr.s(auto_attribs=True) | ||
| class MutablePerConnectionState(PerConnectionState): | ||
| """A mutable version of `PerConnectionState`""" | ||
|
|
||
| rooms: MutableRoomStatusMap | ||
|
|
||
| def has_updates(self) -> bool: | ||
| return bool(self.rooms.get_updates()) | ||
|
|
||
|
|
||
| @attr.s(auto_attribs=True) | ||
| class SlidingSyncConnectionStore: | ||
| """In-memory store of per-connection state, including what rooms we have | ||
|
|
@@ -3063,9 +3182,9 @@ class SlidingSyncConnectionStore: | |
| to mapping of room ID to `HaveSentRoom`. | ||
| """ | ||
|
|
||
| # `(user_id, conn_id)` -> `token` -> `room_id` -> `HaveSentRoom` | ||
| _connections: Dict[Tuple[str, str], Dict[int, Dict[str, HaveSentRoom]]] = ( | ||
| attr.Factory(dict) | ||
| # `(user_id, conn_id)` -> `connection_position` -> `PerConnectionState` | ||
| _connections: Dict[Tuple[str, str], Dict[int, PerConnectionState]] = attr.Factory( | ||
| dict | ||
| ) | ||
|
|
||
| async def is_valid_token( | ||
|
|
@@ -3078,48 +3197,52 @@ async def is_valid_token( | |
| conn_key = self._get_connection_key(sync_config) | ||
| return connection_token in self._connections.get(conn_key, {}) | ||
|
|
||
| async def have_sent_room( | ||
| self, sync_config: SlidingSyncConfig, connection_token: int, room_id: str | ||
| ) -> HaveSentRoom: | ||
| """For the given user_id/conn_id/token, return whether we have | ||
| previously sent the room down | ||
| async def get_per_connection_state( | ||
| self, | ||
| sync_config: SlidingSyncConfig, | ||
| from_token: Optional[SlidingSyncStreamToken], | ||
| ) -> PerConnectionState: | ||
| """Fetch the per-connection state for the token. | ||
|
|
||
| Raises: | ||
| SlidingSyncUnknownPosition if the connection_token is unknown | ||
| """ | ||
| if from_token is None: | ||
| return PerConnectionState() | ||
|
|
||
| connection_position = from_token.connection_position | ||
| if connection_position == 0: | ||
| # Initial sync (request without a `from_token`) starts at `0` so | ||
| # there is no existing per-connection state | ||
| return PerConnectionState() | ||
|
|
||
| conn_key = self._get_connection_key(sync_config) | ||
| sync_statuses = self._connections.setdefault(conn_key, {}) | ||
| room_status = sync_statuses.get(connection_token, {}).get( | ||
| room_id, HAVE_SENT_ROOM_NEVER | ||
| ) | ||
| sync_statuses = self._connections.get(conn_key, {}) | ||
| connection_state = sync_statuses.get(connection_position) | ||
|
|
||
| return room_status | ||
| if connection_state is None: | ||
| raise SlidingSyncUnknownPosition() | ||
|
|
||
| return connection_state | ||
|
|
||
| @trace | ||
| async def record_rooms( | ||
| async def record_new_state( | ||
| self, | ||
| sync_config: SlidingSyncConfig, | ||
| from_token: Optional[SlidingSyncStreamToken], | ||
| *, | ||
| sent_room_ids: StrCollection, | ||
| unsent_room_ids: StrCollection, | ||
| per_connection_state: MutablePerConnectionState, | ||
| ) -> int: | ||
erikjohnston marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| """Record which rooms we have/haven't sent down in a new response | ||
|
|
||
| Attributes: | ||
| sync_config | ||
| from_token: The since token from the request, if any | ||
| sent_room_ids: The set of room IDs that we have sent down as | ||
| part of this request (only needs to be ones we didn't | ||
| previously sent down). | ||
| unsent_room_ids: The set of room IDs that have had updates | ||
| since the `from_token`, but which were not included in | ||
| this request | ||
| """Record updated per-connection state, returning the connection | ||
| position associated with the new state. | ||
|
|
||
| If there are no changes to the state this may return the same token as | ||
| the existing per-connection state. | ||
| """ | ||
| prev_connection_token = 0 | ||
| if from_token is not None: | ||
| prev_connection_token = from_token.connection_position | ||
|
|
||
| # If there are no changes then this is a noop. | ||
| if not sent_room_ids and not unsent_room_ids: | ||
| if not per_connection_state.has_updates(): | ||
| return prev_connection_token | ||
|
|
||
| conn_key = self._get_connection_key(sync_config) | ||
|
|
@@ -3130,42 +3253,9 @@ async def record_rooms( | |
| new_store_token = prev_connection_token + 1 | ||
| sync_statuses.pop(new_store_token, None) | ||
|
|
||
| # Copy over and update the room mappings. | ||
| new_room_statuses = dict(sync_statuses.get(prev_connection_token, {})) | ||
|
|
||
| # Whether we have updated the `new_room_statuses`, if we don't by the | ||
| # end we can treat this as a noop. | ||
| have_updated = False | ||
| for room_id in sent_room_ids: | ||
| new_room_statuses[room_id] = HAVE_SENT_ROOM_LIVE | ||
| have_updated = True | ||
|
|
||
| # Whether we add/update the entries for unsent rooms depends on the | ||
| # existing entry: | ||
| # - LIVE: We have previously sent down everything up to | ||
| # `last_room_token, so we update the entry to be `PREVIOUSLY` with | ||
| # `last_room_token`. | ||
| # - PREVIOUSLY: We have previously sent down everything up to *a* | ||
| # given token, so we don't need to update the entry. | ||
| # - NEVER: We have never previously sent down the room, and we haven't | ||
| # sent anything down this time either so we leave it as NEVER. | ||
|
|
||
| # Work out the new state for unsent rooms that were `LIVE`. | ||
| if from_token: | ||
| new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key) | ||
| else: | ||
| new_unsent_state = HAVE_SENT_ROOM_NEVER | ||
|
|
||
| for room_id in unsent_room_ids: | ||
| prev_state = new_room_statuses.get(room_id) | ||
| if prev_state is not None and prev_state.status == HaveSentRoomFlag.LIVE: | ||
| new_room_statuses[room_id] = new_unsent_state | ||
| have_updated = True | ||
|
|
||
| if not have_updated: | ||
| return prev_connection_token | ||
|
|
||
| sync_statuses[new_store_token] = new_room_statuses | ||
| sync_statuses[new_store_token] = PerConnectionState( | ||
| rooms=per_connection_state.rooms.copy(), | ||
| ) | ||
|
|
||
| return new_store_token | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.