Skip to content

Commit 8bbc98e

Browse files
Use a new token format for sliding sync (#17452)
This is in preparation for adding per-connection state. --------- Co-authored-by: Eric Eastwood <[email protected]>
1 parent 4b9f4c2 commit 8bbc98e

File tree

6 files changed

+301
-208
lines changed

6 files changed

+301
-208
lines changed

changelog.d/17452.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Change sliding sync to use their own token format in preparation for storing per-connection state.

synapse/handlers/sliding_sync.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
PersistedEventPosition,
5050
Requester,
5151
RoomStreamToken,
52+
SlidingSyncStreamToken,
5253
StateMap,
5354
StreamKeyType,
5455
StreamToken,
@@ -362,7 +363,7 @@ async def wait_for_sync_for_user(
362363
self,
363364
requester: Requester,
364365
sync_config: SlidingSyncConfig,
365-
from_token: Optional[StreamToken] = None,
366+
from_token: Optional[SlidingSyncStreamToken] = None,
366367
timeout_ms: int = 0,
367368
) -> SlidingSyncResult:
368369
"""
@@ -393,7 +394,7 @@ async def wait_for_sync_for_user(
393394
# this returns false, it means we timed out waiting, and we should
394395
# just return an empty response.
395396
before_wait_ts = self.clock.time_msec()
396-
if not await self.notifier.wait_for_stream_token(from_token):
397+
if not await self.notifier.wait_for_stream_token(from_token.stream_token):
397398
logger.warning(
398399
"Timed out waiting for worker to catch up. Returning empty response"
399400
)
@@ -431,7 +432,7 @@ async def current_sync_callback(
431432
sync_config.user.to_string(),
432433
timeout_ms,
433434
current_sync_callback,
434-
from_token=from_token,
435+
from_token=from_token.stream_token,
435436
)
436437

437438
return result
@@ -440,7 +441,7 @@ async def current_sync_for_user(
440441
self,
441442
sync_config: SlidingSyncConfig,
442443
to_token: StreamToken,
443-
from_token: Optional[StreamToken] = None,
444+
from_token: Optional[SlidingSyncStreamToken] = None,
444445
) -> SlidingSyncResult:
445446
"""
446447
Generates the response body of a Sliding Sync result, represented as a
@@ -473,7 +474,7 @@ async def current_sync_for_user(
473474
await self.get_room_membership_for_user_at_to_token(
474475
user=sync_config.user,
475476
to_token=to_token,
476-
from_token=from_token,
477+
from_token=from_token.stream_token if from_token else None,
477478
)
478479
)
479480

@@ -631,8 +632,11 @@ async def handle_room(room_id: str) -> None:
631632
to_token=to_token,
632633
)
633634

635+
# TODO: Update this when we implement per-connection state
636+
connection_token = 0
637+
634638
return SlidingSyncResult(
635-
next_pos=to_token,
639+
next_pos=SlidingSyncStreamToken(to_token, connection_token),
636640
lists=lists,
637641
rooms=rooms,
638642
extensions=extensions,
@@ -1367,7 +1371,7 @@ async def get_room_sync_data(
13671371
room_id: str,
13681372
room_sync_config: RoomSyncConfig,
13691373
room_membership_for_user_at_to_token: _RoomMembershipForUser,
1370-
from_token: Optional[StreamToken],
1374+
from_token: Optional[SlidingSyncStreamToken],
13711375
to_token: StreamToken,
13721376
) -> SlidingSyncResult.RoomResult:
13731377
"""
@@ -1431,7 +1435,7 @@ async def get_room_sync_data(
14311435
# - TODO: For an incremental sync where we haven't sent it down this
14321436
# connection before
14331437
to_bound = (
1434-
from_token.room_key
1438+
from_token.stream_token.room_key
14351439
if from_token is not None
14361440
and not room_membership_for_user_at_to_token.newly_joined
14371441
else None
@@ -1498,7 +1502,9 @@ async def get_room_sync_data(
14981502
instance_name=timeline_event.internal_metadata.instance_name,
14991503
stream=timeline_event.internal_metadata.stream_ordering,
15001504
)
1501-
if persisted_position.persisted_after(from_token.room_key):
1505+
if persisted_position.persisted_after(
1506+
from_token.stream_token.room_key
1507+
):
15021508
num_live += 1
15031509
else:
15041510
# Since we're iterating over the timeline events in
@@ -1786,7 +1792,7 @@ async def get_extensions_response(
17861792
self,
17871793
sync_config: SlidingSyncConfig,
17881794
to_token: StreamToken,
1789-
from_token: Optional[StreamToken],
1795+
from_token: Optional[SlidingSyncStreamToken],
17901796
) -> SlidingSyncResult.Extensions:
17911797
"""Handle extension requests.
17921798
@@ -1900,7 +1906,7 @@ async def get_e2ee_extension_response(
19001906
sync_config: SlidingSyncConfig,
19011907
e2ee_request: SlidingSyncConfig.Extensions.E2eeExtension,
19021908
to_token: StreamToken,
1903-
from_token: Optional[StreamToken],
1909+
from_token: Optional[SlidingSyncStreamToken],
19041910
) -> Optional[SlidingSyncResult.Extensions.E2eeExtension]:
19051911
"""Handle E2EE device extension (MSC3884)
19061912
@@ -1922,7 +1928,7 @@ async def get_e2ee_extension_response(
19221928
# TODO: This should take into account the `from_token` and `to_token`
19231929
device_list_updates = await self.device_handler.get_user_ids_changed(
19241930
user_id=user_id,
1925-
from_token=from_token,
1931+
from_token=from_token.stream_token,
19261932
)
19271933

19281934
device_one_time_keys_count: Mapping[str, int] = {}

synapse/rest/client/sync.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
from synapse.http.site import SynapseRequest
5555
from synapse.logging.opentracing import trace_with_opname
5656
from synapse.rest.admin.experimental_features import ExperimentalFeature
57-
from synapse.types import JsonDict, Requester, StreamToken
57+
from synapse.types import JsonDict, Requester, SlidingSyncStreamToken, StreamToken
5858
from synapse.types.rest.client import SlidingSyncBody
5959
from synapse.util import json_decoder
6060
from synapse.util.caches.lrucache import LruCache
@@ -889,7 +889,9 @@ async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
889889

890890
from_token = None
891891
if from_token_string is not None:
892-
from_token = await StreamToken.from_string(self.store, from_token_string)
892+
from_token = await SlidingSyncStreamToken.from_string(
893+
self.store, from_token_string
894+
)
893895

894896
# TODO: We currently don't know whether we're going to use sticky params or
895897
# maybe some filters like sync v2 where they are built up once and referenced

synapse/types/__init__.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1160,6 +1160,49 @@ def __str__(self) -> str:
11601160
)
11611161

11621162

1163+
@attr.s(slots=True, frozen=True, auto_attribs=True)
1164+
class SlidingSyncStreamToken:
1165+
"""The same as a `StreamToken`, but includes an extra field at the start for
1166+
the sliding sync connection token (separated by a '/'). This is used to
1167+
store per-connection state.
1168+
1169+
This then looks something like:
1170+
5/s2633508_17_338_6732159_1082514_541479_274711_265584_1_379
1171+
1172+
Attributes:
1173+
stream_token: Token representing the position of all the standard
1174+
streams.
1175+
connection_position: Token used by sliding sync to track updates to any
1176+
per-connection state stored by Synapse.
1177+
"""
1178+
1179+
stream_token: StreamToken
1180+
connection_position: int
1181+
1182+
@staticmethod
1183+
@cancellable
1184+
async def from_string(store: "DataStore", string: str) -> "SlidingSyncStreamToken":
1185+
"""Creates a SlidingSyncStreamToken from its textual representation."""
1186+
try:
1187+
connection_position_str, stream_token_str = string.split("/", 1)
1188+
connection_position = int(connection_position_str)
1189+
stream_token = await StreamToken.from_string(store, stream_token_str)
1190+
1191+
return SlidingSyncStreamToken(
1192+
stream_token=stream_token,
1193+
connection_position=connection_position,
1194+
)
1195+
except CancelledError:
1196+
raise
1197+
except Exception:
1198+
raise SynapseError(400, "Invalid stream token")
1199+
1200+
async def to_string(self, store: "DataStore") -> str:
1201+
"""Serializes the token to a string"""
1202+
stream_token_str = await self.stream_token.to_string(store)
1203+
return f"{self.connection_position}/{stream_token_str}"
1204+
1205+
11631206
@attr.s(slots=True, frozen=True, auto_attribs=True)
11641207
class PersistedPosition:
11651208
"""Position of a newly persisted row with instance that persisted it."""

synapse/types/handlers/__init__.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,14 @@
3131
from pydantic import Extra
3232

3333
from synapse.events import EventBase
34-
from synapse.types import DeviceListUpdates, JsonDict, JsonMapping, StreamToken, UserID
34+
from synapse.types import (
35+
DeviceListUpdates,
36+
JsonDict,
37+
JsonMapping,
38+
SlidingSyncStreamToken,
39+
StreamToken,
40+
UserID,
41+
)
3542
from synapse.types.rest.client import SlidingSyncBody
3643

3744
if TYPE_CHECKING:
@@ -329,7 +336,7 @@ def __bool__(self) -> bool:
329336
def __bool__(self) -> bool:
330337
return bool(self.to_device or self.e2ee)
331338

332-
next_pos: StreamToken
339+
next_pos: SlidingSyncStreamToken
333340
lists: Dict[str, SlidingWindowList]
334341
rooms: Dict[str, RoomResult]
335342
extensions: Extensions
@@ -342,7 +349,7 @@ def __bool__(self) -> bool:
342349
return bool(self.lists or self.rooms or self.extensions)
343350

344351
@staticmethod
345-
def empty(next_pos: StreamToken) -> "SlidingSyncResult":
352+
def empty(next_pos: SlidingSyncStreamToken) -> "SlidingSyncResult":
346353
"Return a new empty result"
347354
return SlidingSyncResult(
348355
next_pos=next_pos,

0 commit comments

Comments
 (0)