Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17510.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix timeline ordering (using `stream_ordering` instead of topological ordering) in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
6 changes: 4 additions & 2 deletions docs/development/room-dag-concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ incrementing integer, but backfilled events start with `stream_ordering=-1` and

---

- `/sync` returns things in the order they arrive at the server (`stream_ordering`).
- `/messages` (and `/backfill` in the federation API) return them in the order determined by the event graph `(topological_ordering, stream_ordering)`.
- Incremental `/sync?since=xxx` returns things in the order they arrive at the server
(`stream_ordering`).
- Initial `/sync`, `/messages` (and `/backfill` in the federation API) return them in
the order determined by the event graph `(topological_ordering, stream_ordering)`.

The general idea is that, if you're following a room in real-time (i.e.
`/sync`), you probably want to see the messages as they arrive at your server,
Expand Down
10 changes: 8 additions & 2 deletions synapse/handlers/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,14 @@ async def export_user_data(self, user_id: str, writer: "ExfiltrationWriter") ->
# events that we have and then filtering, this isn't the most
# efficient method perhaps but it does guarantee we get everything.
while True:
events, _ = await self._store.paginate_room_events(
room_id, from_key, to_key, limit=100, direction=Direction.FORWARDS
events, _ = (
await self._store.paginate_room_events_by_topological_ordering(
room_id=room_id,
from_key=from_key,
to_key=to_key,
limit=100,
direction=Direction.FORWARDS,
)
)
if not events:
break
Expand Down
32 changes: 18 additions & 14 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,13 +507,15 @@ async def get_messages(

# Initially fetch the events from the database. With any luck, we can return
# these without blocking on backfill (handled below).
events, next_key = await self.store.paginate_room_events(
room_id=room_id,
from_key=from_token.room_key,
to_key=to_room_key,
direction=pagin_config.direction,
limit=pagin_config.limit,
event_filter=event_filter,
events, next_key = (
await self.store.paginate_room_events_by_topological_ordering(
room_id=room_id,
from_key=from_token.room_key,
to_key=to_room_key,
direction=pagin_config.direction,
limit=pagin_config.limit,
event_filter=event_filter,
)
)

if pagin_config.direction == Direction.BACKWARDS:
Expand Down Expand Up @@ -582,13 +584,15 @@ async def get_messages(
# If we did backfill something, refetch the events from the database to
# catch anything new that might have been added since we last fetched.
if did_backfill:
events, next_key = await self.store.paginate_room_events(
room_id=room_id,
from_key=from_token.room_key,
to_key=to_room_key,
direction=pagin_config.direction,
limit=pagin_config.limit,
event_filter=event_filter,
events, next_key = (
await self.store.paginate_room_events_by_topological_ordering(
room_id=room_id,
from_key=from_token.room_key,
to_key=to_room_key,
direction=pagin_config.direction,
limit=pagin_config.limit,
event_filter=event_filter,
)
)
else:
# Otherwise, we can backfill in the background for eventual
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1750,7 +1750,7 @@ async def get_new_events(
from_key=from_key,
to_key=to_key,
limit=limit or 10,
order="ASC",
direction=Direction.FORWARDS,
)

events = list(room_events)
Expand Down
40 changes: 37 additions & 3 deletions synapse/handlers/sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@
ROOM_UNKNOWN_SENTINEL,
Sentinel as StateSentinel,
)
from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
from synapse.storage.databases.main.stream import (
CurrentStateDeltaMembership,
PaginateFunction,
)
from synapse.storage.roommember import MemberSummary
from synapse.types import (
DeviceListUpdates,
Expand Down Expand Up @@ -1863,10 +1866,13 @@ async def get_room_sync_data(
# We should return historical messages (before token range) in the
# following cases because we want clients to be able to show a basic
# screen of information:
#
# - Initial sync (because no `from_token` to limit us anyway)
# - When users `newly_joined`
# - For an incremental sync where we haven't sent it down this
# connection before
#
# Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917
from_bound = None
initial = True
if from_token and not room_membership_for_user_at_to_token.newly_joined:
Expand Down Expand Up @@ -1927,7 +1933,36 @@ async def get_room_sync_data(
room_membership_for_user_at_to_token.event_pos.to_room_stream_token()
)

timeline_events, new_room_key = await self.store.paginate_room_events(
# For initial `/sync` (and other historical scenarios mentioned above), we
# want to view a historical section of the timeline; to fetch events by
# `topological_ordering` (best representation of the room DAG as others were
# seeing it at the time). This also aligns with the order that `/messages`
# returns events in.
#
# For incremental `/sync`, we want to get all updates for rooms since
# the last `/sync` (regardless if those updates arrived late or happened
# a while ago in the past); to fetch events by `stream_ordering` (in the
# order they were received by the server).
#
# Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917
#
# FIXME: Using workaround for mypy,
# https://github.com/python/mypy/issues/10740#issuecomment-1997047277 and
# https://github.com/python/mypy/issues/17479
paginate_room_events_by_topological_ordering: PaginateFunction = (
self.store.paginate_room_events_by_topological_ordering
)
paginate_room_events_by_stream_ordering: PaginateFunction = (
self.store.paginate_room_events_by_stream_ordering
)
pagination_method: PaginateFunction = (
# Use `topographical_ordering` for historical events
paginate_room_events_by_topological_ordering
if from_bound is None
# Use `stream_ordering` for updates
else paginate_room_events_by_stream_ordering
)
timeline_events, new_room_key = await pagination_method(
room_id=room_id,
# The bounds are reversed so we can paginate backwards
# (from newer to older events) starting at to_bound.
Expand All @@ -1938,7 +1973,6 @@ async def get_room_sync_data(
# We add one so we can determine if there are enough events to saturate
# the limit or not (see `limited`)
limit=room_sync_config.timeline_limit + 1,
event_filter=None,
)

# We want to return the events in ascending order (the last event is the
Expand Down
69 changes: 51 additions & 18 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

from synapse.api.constants import (
AccountDataTypes,
Direction,
EventContentFields,
EventTypes,
JoinRules,
Expand All @@ -64,6 +65,7 @@
)
from synapse.storage.databases.main.event_push_actions import RoomNotifCounts
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
from synapse.storage.databases.main.stream import PaginateFunction
from synapse.storage.roommember import MemberSummary
from synapse.types import (
DeviceListUpdates,
Expand Down Expand Up @@ -879,22 +881,49 @@ async def _load_filtered_recents(
since_key = since_token.room_key

while limited and len(recents) < timeline_limit and max_repeat:
# If we have a since_key then we are trying to get any events
# that have happened since `since_key` up to `end_key`, so we
# can just use `get_room_events_stream_for_room`.
# Otherwise, we want to return the last N events in the room
# in topological ordering.
if since_key:
events, end_key = await self.store.get_room_events_stream_for_room(
room_id,
limit=load_limit + 1,
from_key=since_key,
to_key=end_key,
)
else:
events, end_key = await self.store.get_recent_events_for_room(
room_id, limit=load_limit + 1, end_token=end_key
)
# For initial `/sync`, we want to view a historical section of the
# timeline; to fetch events by `topological_ordering` (best
# representation of the room DAG as others were seeing it at the time).
# This also aligns with the order that `/messages` returns events in.
#
# For incremental `/sync`, we want to get all updates for rooms since
# the last `/sync` (regardless if those updates arrived late or happened
# a while ago in the past); to fetch events by `stream_ordering` (in the
# order they were received by the server).
#
# Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917
#
# FIXME: Using workaround for mypy,
# https://github.com/python/mypy/issues/10740#issuecomment-1997047277 and
# https://github.com/python/mypy/issues/17479
paginate_room_events_by_topological_ordering: PaginateFunction = (
self.store.paginate_room_events_by_topological_ordering
)
paginate_room_events_by_stream_ordering: PaginateFunction = (
self.store.paginate_room_events_by_stream_ordering
)
pagination_method: PaginateFunction = (
# Use `topographical_ordering` for historical events
paginate_room_events_by_topological_ordering
if since_key is None
# Use `stream_ordering` for updates
else paginate_room_events_by_stream_ordering
)
events, end_key = await pagination_method(
room_id=room_id,
# The bounds are reversed so we can paginate backwards
# (from newer to older events) starting at to_bound.
# This ensures we fill the `limit` with the newest events first,
from_key=end_key,
to_key=since_key,
direction=Direction.BACKWARDS,
# We add one so we can determine if there are enough events to saturate
# the limit or not (see `limited`)
limit=load_limit + 1,
)
# We want to return the events in ascending order (the last event is the
# most recent).
events.reverse()

log_kv({"loaded_recents": len(events)})

Expand Down Expand Up @@ -2641,9 +2670,10 @@ async def _get_room_changes_for_incremental_sync(
# a "gap" in the timeline, as described by the spec for /sync.
room_to_events = await self.store.get_room_events_stream_for_rooms(
room_ids=sync_result_builder.joined_room_ids,
from_key=since_token.room_key,
to_key=now_token.room_key,
from_key=now_token.room_key,
to_key=since_token.room_key,
limit=timeline_limit + 1,
direction=Direction.BACKWARDS,
)

# We loop through all room ids, even if there are no new events, in case
Expand All @@ -2654,6 +2684,9 @@ async def _get_room_changes_for_incremental_sync(
newly_joined = room_id in newly_joined_rooms
if room_entry:
events, start_key = room_entry
# We want to return the events in ascending order (the last event is the
# most recent).
events.reverse()

prev_batch_token = now_token.copy_and_replace(
StreamKeyType.ROOM, start_key
Expand Down
Loading