-
Notifications
You must be signed in to change notification settings - Fork 412
Sliding Sync: Use stream_ordering based timeline pagination for incremental sync
#17510
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
75e464b
c5966e5
3423f83
f781e5b
4aee1ab
cba4664
d6dd34f
3075a15
d67c9b5
efcc915
0e12dde
6231fb0
c5d0998
3540ac7
f27e145
a67b573
0874a2e
c98e8b1
4888d44
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Fix timeline ordering (using `stream_ordering` instead of topological ordering) in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1855,18 +1855,19 @@ async def get_room_sync_data( | |
| room_membership_for_user_at_to_token.event_pos.to_room_stream_token() | ||
| ) | ||
|
|
||
| timeline_events, new_room_key = await self.store.paginate_room_events( | ||
| room_id=room_id, | ||
| # The bounds are reversed so we can paginate backwards | ||
| # (from newer to older events) starting at to_bound. | ||
| # This ensures we fill the `limit` with the newest events first, | ||
| from_key=to_bound, | ||
| to_key=from_bound, | ||
| direction=Direction.BACKWARDS, | ||
| # We add one so we can determine if there are enough events to saturate | ||
| # the limit or not (see `limited`) | ||
| limit=room_sync_config.timeline_limit + 1, | ||
| event_filter=None, | ||
| timeline_events, new_room_key = ( | ||
| await self.store.get_room_events_stream_for_room( | ||
|
||
| room_id=room_id, | ||
| # The bounds are reversed so we can paginate backwards | ||
| # (from newer to older events) starting at to_bound. | ||
| # This ensures we fill the `limit` with the newest events first, | ||
| from_key=to_bound, | ||
| to_key=from_bound, | ||
| direction=Direction.BACKWARDS, | ||
| # We add one so we can determine if there are enough events to saturate | ||
| # the limit or not (see `limited`) | ||
| limit=room_sync_config.timeline_limit + 1, | ||
| ) | ||
|
||
| ) | ||
|
|
||
| # We want to return the events in ascending order (the last event is the | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -43,6 +43,7 @@ | |
|
|
||
| from synapse.api.constants import ( | ||
| AccountDataTypes, | ||
| Direction, | ||
| EventContentFields, | ||
| EventTypes, | ||
| JoinRules, | ||
|
|
@@ -883,15 +884,20 @@ async def _load_filtered_recents( | |
| # that have happened since `since_key` up to `end_key`, so we | ||
| # can just use `get_room_events_stream_for_room`. | ||
| # Otherwise, we want to return the last N events in the room | ||
| # in topological ordering. | ||
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| # in `stream_ordering`. | ||
| if since_key: | ||
| events, end_key = await self.store.get_room_events_stream_for_room( | ||
| room_id, | ||
| limit=load_limit + 1, | ||
| from_key=since_key, | ||
| to_key=end_key, | ||
| from_key=end_key, | ||
| to_key=since_key, | ||
| direction=Direction.BACKWARDS, | ||
| ) | ||
| # We want to return the events in ascending order (the last event is the | ||
| # most recent). | ||
| events = events.reverse() | ||
|
||
| else: | ||
| # TODO: This should return events in `stream_ordering` order | ||
| events, end_key = await self.store.get_recent_events_for_room( | ||
| room_id, limit=load_limit + 1, end_token=end_key | ||
| ) | ||
|
|
@@ -2641,9 +2647,10 @@ async def _get_room_changes_for_incremental_sync( | |
| # a "gap" in the timeline, as described by the spec for /sync. | ||
| room_to_events = await self.store.get_room_events_stream_for_rooms( | ||
| room_ids=sync_result_builder.joined_room_ids, | ||
| from_key=since_token.room_key, | ||
| to_key=now_token.room_key, | ||
| from_key=now_token.room_key, | ||
| to_key=since_token.room_key, | ||
| limit=timeline_limit + 1, | ||
| direction=Direction.BACKWARDS, | ||
| ) | ||
|
|
||
| # We loop through all room ids, even if there are no new events, in case | ||
|
|
@@ -2654,6 +2661,9 @@ async def _get_room_changes_for_incremental_sync( | |
| newly_joined = room_id in newly_joined_rooms | ||
| if room_entry: | ||
| events, start_key = room_entry | ||
| # We want to return the events in ascending order (the last event is the | ||
| # most recent). | ||
| events = events.reverse() | ||
|
|
||
| prev_batch_token = now_token.copy_and_replace( | ||
| StreamKeyType.ROOM, start_key | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.