This repository was archived by the owner on Apr 26, 2024. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Comments on the /sync tentacles #11494
Merged
Merged
Changes from 22 commits
Commits
Show all changes
23 commits
Select commit
Hold shift + click to select a range
dc3d504
comment wait_for_sync_for_user
39a376d
Comment current_sync_for_user
7da68f7
Comment generate_sync_result
a91bf8b
Comment _generate_sync_entry_for_to_device
326bc74
Comment _generate_sync_entry_for_rooms
d107917
Small refactor in _get_all_rooms
fd66d02
Comment for and rename inside _have_rooms_changed
ef9482a
Comments to stream.py
f7fdab3
Typo fix
9932b6d
Comment for _get_rooms_changed
6a506c4
Fix URL line break
4a995ce
Changelog
15c022d
linty
3eca8bc
Fix a couple of typos
d9c3c15
Rewrite current_sync_for_user docstring
6908482
Use explicit inequality signs
52929d0
Itemise the 4-tuple
3954082
Expand comments in _get_rooms_changed
d0c15cc
Comment _get_all_rooms
a165e27
Explicitly describe order in get_recent_events_for_room
90dd4da
Add type annotations for the five lists
9a0a16b
Oops, re-create empty lists!
c04d7e2
Typo fixes, thanks Patrick!
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Add comments to various parts of the `/sync` handler. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -334,6 +334,19 @@ async def _wait_for_sync_for_user( | |
| full_state: bool, | ||
| cache_context: ResponseCacheContext[SyncRequestKey], | ||
| ) -> SyncResult: | ||
| """The start of the machinery that produces a /sync response. | ||
|
|
||
| See https://spec.matrix.org/v1.1/client-server-api/#syncing for full details. | ||
|
|
||
| This method does high-level bookkeeping: | ||
| - tracking the kind of sync in the logging context | ||
| - deleting any to_device messages whose delivery has been acknowledged. | ||
| - deciding if we should dispatch an instant or delayed response | ||
| - marking the sync as being lazily loaded, if appropriate | ||
|
|
||
| Computing the body of the response begins in the next method, | ||
| `current_sync_for_user`. | ||
| """ | ||
| if since_token is None: | ||
| sync_type = "initial_sync" | ||
| elif full_state: | ||
|
|
@@ -363,7 +376,7 @@ async def _wait_for_sync_for_user( | |
| sync_config, since_token, full_state=full_state | ||
| ) | ||
| else: | ||
|
|
||
| # Otherwise, we wait for something to happen and report it to the user. | ||
| async def current_sync_callback( | ||
| before_token: StreamToken, after_token: StreamToken | ||
| ) -> SyncResult: | ||
|
|
@@ -402,7 +415,12 @@ async def current_sync_for_user( | |
| since_token: Optional[StreamToken] = None, | ||
| full_state: bool = False, | ||
| ) -> SyncResult: | ||
| """Get the sync for client needed to match what the server has now.""" | ||
| """Generates the response body of a sync result, represented as a SyncResult. | ||
|
|
||
| This is a wrapper around `generate_sync_result` which starts an open tracing | ||
| span to track the sync. See `generate_sync_result` for the next part of your | ||
| indoctrination. | ||
| """ | ||
| with start_active_span("current_sync_for_user"): | ||
| log_kv({"since_token": since_token}) | ||
| sync_result = await self.generate_sync_result( | ||
|
|
@@ -560,7 +578,7 @@ async def _load_filtered_recents( | |
| # that have happened since `since_key` up to `end_key`, so we | ||
| # can just use `get_room_events_stream_for_room`. | ||
| # Otherwise, we want to return the last N events in the room | ||
| # in toplogical ordering. | ||
| # in topological ordering. | ||
| if since_key: | ||
| events, end_key = await self.store.get_room_events_stream_for_room( | ||
| room_id, | ||
|
|
@@ -1042,7 +1060,18 @@ async def generate_sync_result( | |
| since_token: Optional[StreamToken] = None, | ||
| full_state: bool = False, | ||
| ) -> SyncResult: | ||
| """Generates a sync result.""" | ||
| """Generates the response body of a sync result. | ||
|
|
||
| This is represented by a `SyncResult` struct, which is built from small pieces | ||
| using a `SyncResultBuilder`. See also | ||
| https://spec.matrix.org/v1.1/client-server-api/#get_matrixclientv3sync | ||
| the `sync_result_builder` is passed as a mutable ("inout") parameter to various | ||
| helper functions. These retrieve and process the data which forms the sync body, | ||
| often writing to the `sync_result_builder` to store their output. | ||
|
|
||
| At the end, we transfer data from the `sync_result_builder` to a new `SyncResult` | ||
| instance to signify that the sync calculation is complete. | ||
| """ | ||
| # NB: The now_token gets changed by some of the generate_sync_* methods, | ||
| # this is due to some of the underlying streams not supporting the ability | ||
| # to query up to a given point. | ||
|
|
@@ -1344,22 +1373,30 @@ async def _generate_sync_entry_for_to_device( | |
| async def _generate_sync_entry_for_account_data( | ||
| self, sync_result_builder: "SyncResultBuilder" | ||
| ) -> Dict[str, Dict[str, JsonDict]]: | ||
| """Generates the account data portion of the sync response. Populates | ||
| `sync_result_builder` with the result. | ||
| """Generates the account data portion of the sync response. | ||
|
|
||
| Account data (called "Client Config" in the spec) can be set either globally | ||
| or for a specific room. Account data consists of a list of events which | ||
| accumulate state, much like a room. | ||
|
|
||
| This function retrieves global and per-room account data. The former is written | ||
| to the given `sync_result_builder`. The latter is returned directly, to be | ||
| later written to the `sync_result_builder` on a room-by-room basis. | ||
|
|
||
| Args: | ||
| sync_result_builder | ||
|
|
||
| Returns: | ||
| A dictionary containing the per room account data. | ||
| A dictionary whose keys (room ids) map to the per room account data for that | ||
| room. | ||
| """ | ||
| sync_config = sync_result_builder.sync_config | ||
| user_id = sync_result_builder.sync_config.user.to_string() | ||
| since_token = sync_result_builder.since_token | ||
|
|
||
| if since_token and not sync_result_builder.full_state: | ||
| ( | ||
| account_data, | ||
| global_account_data, | ||
| account_data_by_room, | ||
| ) = await self.store.get_updated_account_data_for_user( | ||
| user_id, since_token.account_data_key | ||
|
|
@@ -1370,23 +1407,23 @@ async def _generate_sync_entry_for_account_data( | |
| ) | ||
|
|
||
| if push_rules_changed: | ||
| account_data["m.push_rules"] = await self.push_rules_for_user( | ||
| global_account_data["m.push_rules"] = await self.push_rules_for_user( | ||
| sync_config.user | ||
| ) | ||
| else: | ||
| ( | ||
| account_data, | ||
| global_account_data, | ||
| account_data_by_room, | ||
| ) = await self.store.get_account_data_for_user(sync_config.user.to_string()) | ||
|
|
||
| account_data["m.push_rules"] = await self.push_rules_for_user( | ||
| global_account_data["m.push_rules"] = await self.push_rules_for_user( | ||
| sync_config.user | ||
| ) | ||
|
|
||
| account_data_for_user = await sync_config.filter_collection.filter_account_data( | ||
| [ | ||
| {"type": account_data_type, "content": content} | ||
| for account_data_type, content in account_data.items() | ||
| for account_data_type, content in global_account_data.items() | ||
| ] | ||
| ) | ||
|
|
||
|
|
@@ -1460,15 +1497,22 @@ async def _generate_sync_entry_for_rooms( | |
| """Generates the rooms portion of the sync response. Populates the | ||
| `sync_result_builder` with the result. | ||
|
|
||
| In the response that reaches the client, rooms are divided into four categories: | ||
| `invite`, `join`, `knock`, `leave`. These aren't the same as the four sets of | ||
| room ids returned by this function. | ||
|
|
||
| Args: | ||
| sync_result_builder | ||
| account_data_by_room: Dictionary of per room account data | ||
|
|
||
| Returns: | ||
| Returns a 4-tuple of | ||
| `(newly_joined_rooms, newly_joined_or_invited_users, | ||
| newly_left_rooms, newly_left_users)` | ||
| Returns a 4-tuple whose entries are: | ||
| - newly_joined_rooms | ||
| - newly_joined_or_invited_or_knocked_users, | ||
| - newly_left_rooms | ||
| - newly_left_users | ||
| """ | ||
| # Start by fetching all ephemeral events in rooms we've joined (if required). | ||
| user_id = sync_result_builder.sync_config.user.to_string() | ||
| block_all_room_ephemeral = ( | ||
| sync_result_builder.since_token is None | ||
|
|
@@ -1590,19 +1634,22 @@ async def _have_rooms_changed( | |
| ) -> bool: | ||
| """Returns whether there may be any new events that should be sent down | ||
| the sync. Returns True if there are. | ||
|
|
||
| Does not modify the `sync_result_builder`. | ||
| """ | ||
| user_id = sync_result_builder.sync_config.user.to_string() | ||
| since_token = sync_result_builder.since_token | ||
| now_token = sync_result_builder.now_token | ||
|
|
||
| assert since_token | ||
|
|
||
| # Get a list of membership change events that have happened. | ||
| rooms_changed = await self.store.get_membership_changes_for_user( | ||
| # Get a list of membership change events that have happened to the user | ||
| # requesting the sync. | ||
| membership_changes = await self.store.get_membership_changes_for_user( | ||
| user_id, since_token.room_key, now_token.room_key | ||
| ) | ||
|
|
||
| if rooms_changed: | ||
| if membership_changes: | ||
| return True | ||
|
|
||
| stream_id = since_token.room_key.stream | ||
|
|
@@ -1614,29 +1661,62 @@ async def _have_rooms_changed( | |
| async def _get_rooms_changed( | ||
| self, sync_result_builder: "SyncResultBuilder", ignored_users: FrozenSet[str] | ||
| ) -> _RoomChanges: | ||
| """Gets the the changes that have happened since the last sync.""" | ||
| """Determine the changes in rooms to report to the user. | ||
|
|
||
| Ideally, we want to report all events whose stream ordering `s` lies in the | ||
| range `since_token < s <= now_token`, where the two tokens are read from the | ||
| sync_result_builder. | ||
|
|
||
| If there are too many events in that range to report, things get complicated. | ||
| In this situation we return a truncated list of the most recent events, and | ||
| indicate in the response that there is a "gap" of omitted events. Additionally: | ||
|
|
||
| - we include a "state_delta", to describethe changes in state over the gap, | ||
DMRobertson marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| - we include all membership events applying to the user making the request, | ||
| even those in the gap. | ||
|
|
||
| See the spec for the rationale: | ||
| https://spec.matrix.org/v1.1/client-server-api/#syncing | ||
|
|
||
| The sync_result_builder is not modified by this function. | ||
| """ | ||
| user_id = sync_result_builder.sync_config.user.to_string() | ||
| since_token = sync_result_builder.since_token | ||
| now_token = sync_result_builder.now_token | ||
| sync_config = sync_result_builder.sync_config | ||
|
|
||
| assert since_token | ||
|
|
||
| # Get a list of membership change events that have happened. | ||
| rooms_changed = await self.store.get_membership_changes_for_user( | ||
| # The spec | ||
| # https://spec.matrix.org/v1.1/client-server-api/#get_matrixclientv3sync | ||
| # notes that membership events need special consideration: | ||
| # | ||
| # > When a sync is limited, the server MUST return membership events for events | ||
| # > in the gap (between since and the start of the returned timeline), regardless | ||
| # > as to whether or not they are redundant. | ||
| # | ||
| # We fetch such events here, but we only seem to use them for categorising rooms | ||
clokep marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| # as newly joined, newly left, invited or knocked. | ||
|
Comment on lines
+1698
to
+1699
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is a misunderstanding. Lines 1703-1705 only fetch membership events for the calling user, which is done, as you note, to figure out which rooms we have joined/left since the previous sync. It's nothing do to with returning the membership events for events in the gap (which could be from any user in the room, not just the calling user).
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, this is illuminating. Will PR a correction. |
||
| # TODO: we've already called this function and ran this query in | ||
| # _have_rooms_changed. We could keep the results in memory to avoid a | ||
| # second query, at the cost of more complicated source code. | ||
DMRobertson marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| membership_change_events = await self.store.get_membership_changes_for_user( | ||
| user_id, since_token.room_key, now_token.room_key | ||
| ) | ||
|
|
||
| mem_change_events_by_room_id: Dict[str, List[EventBase]] = {} | ||
| for event in rooms_changed: | ||
| for event in membership_change_events: | ||
| mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) | ||
|
|
||
| newly_joined_rooms = [] | ||
| newly_left_rooms = [] | ||
| room_entries = [] | ||
| invited = [] | ||
| knocked = [] | ||
| newly_joined_rooms: List[str] = [] | ||
| newly_left_rooms: List[str] = [] | ||
| room_entries: List[RoomSyncResultBuilder] = [] | ||
| invited: List[InvitedSyncResult] = [] | ||
| knocked: List[KnockedSyncResult] = [] | ||
| for room_id, events in mem_change_events_by_room_id.items(): | ||
| # The body of this loop will add this room to at least one of the five lists | ||
| # above. Things get messy if you've e.g. joined, left, joined then left the | ||
| # room all in the same sync period. | ||
| logger.debug( | ||
| "Membership changes in %s: [%s]", | ||
| room_id, | ||
|
|
@@ -1781,7 +1861,9 @@ async def _get_rooms_changed( | |
|
|
||
| timeline_limit = sync_config.filter_collection.timeline_limit() | ||
|
|
||
| # Get all events for rooms we're currently joined to. | ||
| # Get all events since the `from_key` in rooms we're currently joined to. | ||
| # If there are too many, we get the most recent events only. This leaves | ||
| # a "gap" in the timeline, as described by the spec for /sync. | ||
| room_to_events = await self.store.get_room_events_stream_for_rooms( | ||
| room_ids=sync_result_builder.joined_room_ids, | ||
| from_key=since_token.room_key, | ||
|
|
@@ -1842,6 +1924,10 @@ async def _get_all_rooms( | |
| ) -> _RoomChanges: | ||
| """Returns entries for all rooms for the user. | ||
|
|
||
| Like `_get_rooms_changed`, but assumes the `since_token` is `None`. | ||
|
|
||
| This function does not modify the sync_result_builder. | ||
|
|
||
| Args: | ||
| sync_result_builder | ||
| ignored_users: Set of users ignored by user. | ||
|
|
@@ -1853,16 +1939,9 @@ async def _get_all_rooms( | |
| now_token = sync_result_builder.now_token | ||
| sync_config = sync_result_builder.sync_config | ||
|
|
||
| membership_list = ( | ||
| Membership.INVITE, | ||
| Membership.KNOCK, | ||
| Membership.JOIN, | ||
| Membership.LEAVE, | ||
| Membership.BAN, | ||
| ) | ||
|
|
||
| room_list = await self.store.get_rooms_for_local_user_where_membership_is( | ||
| user_id=user_id, membership_list=membership_list | ||
| user_id=user_id, | ||
| membership_list=Membership.LIST, | ||
| ) | ||
|
|
||
| room_entries = [] | ||
|
|
@@ -2212,8 +2291,7 @@ def _calculate_state( | |
| # to only include membership events for the senders in the timeline. | ||
| # In practice, we can do this by removing them from the p_ids list, | ||
| # which is the list of relevant state we know we have already sent to the client. | ||
| # see https://github.com/matrix-org/synapse/pull/2970 | ||
| # /files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809 | ||
| # see https://github.com/matrix-org/synapse/pull/2970/files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809 | ||
|
|
||
| if lazy_load_members: | ||
| p_ids.difference_update( | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.