Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 538e399

Browse files
author
David Robertson
committed
Track un-partial-state stream in sync tokens
So that we can work out which rooms have become fully-stated during a given sync period.
1 parent 12c11ae commit 538e399

File tree

5 files changed

+19
-11
lines changed

5 files changed

+19
-11
lines changed

synapse/storage/databases/main/relations.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,7 @@ def _get_recent_references_for_event_txn(
292292
to_device_key=0,
293293
device_list_key=0,
294294
groups_key=0,
295+
un_partial_stated_rooms_key=0,
295296
)
296297

297298
return events[:limit], next_token

synapse/streams/events.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ def get_current_token(self) -> StreamToken:
5858
push_rules_key = self.store.get_max_push_rules_stream_id()
5959
to_device_key = self.store.get_to_device_stream_token()
6060
device_list_key = self.store.get_device_stream_token()
61+
un_partial_stated_rooms_key = self.store.get_un_partial_stated_rooms_token()
6162

6263
token = StreamToken(
6364
room_key=self.sources.room.get_current_key(),
@@ -70,6 +71,7 @@ def get_current_token(self) -> StreamToken:
7071
device_list_key=device_list_key,
7172
# Groups key is unused.
7273
groups_key=0,
74+
un_partial_stated_rooms_key=un_partial_stated_rooms_key,
7375
)
7476
return token
7577

@@ -107,5 +109,6 @@ async def get_current_token_for_pagination(self, room_id: str) -> StreamToken:
107109
to_device_key=0,
108110
device_list_key=0,
109111
groups_key=0,
112+
un_partial_stated_rooms_key=0,
110113
)
111114
return token

synapse/types/__init__.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -634,14 +634,15 @@ class StreamKeyType:
634634
PUSH_RULES: Final = "push_rules_key"
635635
TO_DEVICE: Final = "to_device_key"
636636
DEVICE_LIST: Final = "device_list_key"
637+
UN_PARTIAL_STATED_ROOMS = "un_partial_stated_rooms_key"
637638

638639

639640
@attr.s(slots=True, frozen=True, auto_attribs=True)
640641
class StreamToken:
641642
"""A collection of keys joined together by underscores in the following
642643
order and which represent the position in their respective streams.
643644
644-
ex. `s2633508_17_338_6732159_1082514_541479_274711_265584_1`
645+
ex. `s2633508_17_338_6732159_1082514_541479_274711_265584_1_379`
645646
1. `room_key`: `s2633508` which is a `RoomStreamToken`
646647
- `RoomStreamToken`'s can also look like `t426-2633508` or `m56~2.58~3.59`
647648
- See the docstring for `RoomStreamToken` for more details.
@@ -653,12 +654,13 @@ class StreamToken:
653654
7. `to_device_key`: `274711`
654655
8. `device_list_key`: `265584`
655656
9. `groups_key`: `1` (note that this key is now unused)
657+
10. `un_partial_stated_rooms_key`: `379`
656658
657659
You can see how many of these keys correspond to the various
658660
fields in a "/sync" response:
659661
```json
660662
{
661-
"next_batch": "s12_4_0_1_1_1_1_4_1",
663+
"next_batch": "s12_4_0_1_1_1_1_4_1_1",
662664
"presence": {
663665
"events": []
664666
},
@@ -670,7 +672,7 @@ class StreamToken:
670672
"!QrZlfIDQLNLdZHqTnt:hs1": {
671673
"timeline": {
672674
"events": [],
673-
"prev_batch": "s10_4_0_1_1_1_1_4_1",
675+
"prev_batch": "s10_4_0_1_1_1_1_4_1_1",
674676
"limited": false
675677
},
676678
"state": {
@@ -706,6 +708,7 @@ class StreamToken:
706708
device_list_key: int
707709
# Note that the groups key is no longer used and may have bogus values.
708710
groups_key: int
711+
un_partial_stated_rooms_key: int
709712

710713
_SEPARATOR = "_"
711714
START: ClassVar["StreamToken"]
@@ -744,6 +747,7 @@ async def to_string(self, store: "DataStore") -> str:
744747
# serialized so that there will not be confusion in the future
745748
# if additional tokens are added.
746749
str(self.groups_key),
750+
str(self.un_partial_stated_rooms_key),
747751
]
748752
)
749753

@@ -776,7 +780,7 @@ def copy_and_replace(self, key: str, new_value: Any) -> "StreamToken":
776780
return attr.evolve(self, **{key: new_value})
777781

778782

779-
StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0)
783+
StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0, 0)
780784

781785

782786
@attr.s(slots=True, frozen=True, auto_attribs=True)

tests/rest/admin/test_room.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1831,7 +1831,7 @@ def test_timestamp_to_event(self) -> None:
18311831

18321832
def test_topo_token_is_accepted(self) -> None:
18331833
"""Test Topo Token is accepted."""
1834-
token = "t1-0_0_0_0_0_0_0_0_0"
1834+
token = "t1-0_0_0_0_0_0_0_0_0_0"
18351835
channel = self.make_request(
18361836
"GET",
18371837
"/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token),
@@ -1845,7 +1845,7 @@ def test_topo_token_is_accepted(self) -> None:
18451845

18461846
def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None:
18471847
"""Test that stream token is accepted for forward pagination."""
1848-
token = "s0_0_0_0_0_0_0_0_0"
1848+
token = "s0_0_0_0_0_0_0_0_0_0"
18491849
channel = self.make_request(
18501850
"GET",
18511851
"/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token),

tests/rest/client/test_rooms.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1987,7 +1987,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
19871987
self.room_id = self.helper.create_room_as(self.user_id)
19881988

19891989
def test_topo_token_is_accepted(self) -> None:
1990-
token = "t1-0_0_0_0_0_0_0_0_0"
1990+
token = "t1-0_0_0_0_0_0_0_0_0_0"
19911991
channel = self.make_request(
19921992
"GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)
19931993
)
@@ -1998,7 +1998,7 @@ def test_topo_token_is_accepted(self) -> None:
19981998
self.assertTrue("end" in channel.json_body)
19991999

20002000
def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None:
2001-
token = "s0_0_0_0_0_0_0_0_0"
2001+
token = "s0_0_0_0_0_0_0_0_0_0"
20022002
channel = self.make_request(
20032003
"GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)
20042004
)
@@ -2728,7 +2728,7 @@ def test_messages_filter_labels(self) -> None:
27282728
"""Test that we can filter by a label on a /messages request."""
27292729
self._send_labelled_messages_in_room()
27302730

2731-
token = "s0_0_0_0_0_0_0_0_0"
2731+
token = "s0_0_0_0_0_0_0_0_0_0"
27322732
channel = self.make_request(
27332733
"GET",
27342734
"/rooms/%s/messages?access_token=%s&from=%s&filter=%s"
@@ -2745,7 +2745,7 @@ def test_messages_filter_not_labels(self) -> None:
27452745
"""Test that we can filter by the absence of a label on a /messages request."""
27462746
self._send_labelled_messages_in_room()
27472747

2748-
token = "s0_0_0_0_0_0_0_0_0"
2748+
token = "s0_0_0_0_0_0_0_0_0_0"
27492749
channel = self.make_request(
27502750
"GET",
27512751
"/rooms/%s/messages?access_token=%s&from=%s&filter=%s"
@@ -2768,7 +2768,7 @@ def test_messages_filter_labels_not_labels(self) -> None:
27682768
"""
27692769
self._send_labelled_messages_in_room()
27702770

2771-
token = "s0_0_0_0_0_0_0_0_0"
2771+
token = "s0_0_0_0_0_0_0_0_0_0"
27722772
channel = self.make_request(
27732773
"GET",
27742774
"/rooms/%s/messages?access_token=%s&from=%s&filter=%s"

0 commit comments

Comments
 (0)