Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit a564b92

Browse files
authored
Convert *StreamRow classes to inner classes (#7116)
This just helps keep the rows closer to their streams, so that it's easier to see what the format of each stream is.
1 parent 5126cb1 commit a564b92

File tree

6 files changed

+106
-100
lines changed

6 files changed

+106
-100
lines changed

changelog.d/7116.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Convert `*StreamRow` classes to inner classes.

synapse/app/generic_worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -804,7 +804,7 @@ def process_replication_rows(self, stream_name, token, rows):
804804
async def _on_new_receipts(self, rows):
805805
"""
806806
Args:
807-
rows (iterable[synapse.replication.tcp.streams.ReceiptsStreamRow]):
807+
rows (Iterable[synapse.replication.tcp.streams.ReceiptsStream.ReceiptsStreamRow]):
808808
new receipts to be processed
809809
"""
810810
for receipt in rows:

synapse/federation/send_queue.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,7 @@ def process_rows_for_federation(transaction_queue, rows):
477477
478478
Args:
479479
transaction_queue (FederationSender)
480-
rows (list(synapse.replication.tcp.streams.FederationStreamRow))
480+
rows (list(synapse.replication.tcp.streams.federation.FederationStream.FederationStreamRow))
481481
"""
482482

483483
# The federation stream contains a bunch of different types of

synapse/replication/tcp/streams/_base.py

Lines changed: 93 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -28,94 +28,6 @@
2828

2929
MAX_EVENTS_BEHIND = 500000
3030

31-
BackfillStreamRow = namedtuple(
32-
"BackfillStreamRow",
33-
(
34-
"event_id", # str
35-
"room_id", # str
36-
"type", # str
37-
"state_key", # str, optional
38-
"redacts", # str, optional
39-
"relates_to", # str, optional
40-
),
41-
)
42-
PresenceStreamRow = namedtuple(
43-
"PresenceStreamRow",
44-
(
45-
"user_id", # str
46-
"state", # str
47-
"last_active_ts", # int
48-
"last_federation_update_ts", # int
49-
"last_user_sync_ts", # int
50-
"status_msg", # str
51-
"currently_active", # bool
52-
),
53-
)
54-
TypingStreamRow = namedtuple(
55-
"TypingStreamRow", ("room_id", "user_ids") # str # list(str)
56-
)
57-
ReceiptsStreamRow = namedtuple(
58-
"ReceiptsStreamRow",
59-
(
60-
"room_id", # str
61-
"receipt_type", # str
62-
"user_id", # str
63-
"event_id", # str
64-
"data", # dict
65-
),
66-
)
67-
PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",)) # str
68-
PushersStreamRow = namedtuple(
69-
"PushersStreamRow",
70-
("user_id", "app_id", "pushkey", "deleted"), # str # str # str # bool
71-
)
72-
73-
74-
@attr.s
75-
class CachesStreamRow:
76-
"""Stream to inform workers they should invalidate their cache.
77-
78-
Attributes:
79-
cache_func: Name of the cached function.
80-
keys: The entry in the cache to invalidate. If None then will
81-
invalidate all.
82-
invalidation_ts: Timestamp of when the invalidation took place.
83-
"""
84-
85-
cache_func = attr.ib(type=str)
86-
keys = attr.ib(type=Optional[List[Any]])
87-
invalidation_ts = attr.ib(type=int)
88-
89-
90-
PublicRoomsStreamRow = namedtuple(
91-
"PublicRoomsStreamRow",
92-
(
93-
"room_id", # str
94-
"visibility", # str
95-
"appservice_id", # str, optional
96-
"network_id", # str, optional
97-
),
98-
)
99-
100-
101-
@attr.s
102-
class DeviceListsStreamRow:
103-
entity = attr.ib(type=str)
104-
105-
106-
ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",)) # str
107-
TagAccountDataStreamRow = namedtuple(
108-
"TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict
109-
)
110-
AccountDataStreamRow = namedtuple(
111-
"AccountDataStream", ("user_id", "room_id", "data_type") # str # str # str
112-
)
113-
GroupsStreamRow = namedtuple(
114-
"GroupsStreamRow",
115-
("group_id", "user_id", "type", "content"), # str # str # str # dict
116-
)
117-
UserSignatureStreamRow = namedtuple("UserSignatureStreamRow", ("user_id")) # str
118-
11931

12032
class Stream(object):
12133
"""Base class for the streams.
@@ -234,6 +146,18 @@ class BackfillStream(Stream):
234146
or it went from being an outlier to not.
235147
"""
236148

149+
BackfillStreamRow = namedtuple(
150+
"BackfillStreamRow",
151+
(
152+
"event_id", # str
153+
"room_id", # str
154+
"type", # str
155+
"state_key", # str, optional
156+
"redacts", # str, optional
157+
"relates_to", # str, optional
158+
),
159+
)
160+
237161
NAME = "backfill"
238162
ROW_TYPE = BackfillStreamRow
239163

@@ -246,6 +170,19 @@ def __init__(self, hs):
246170

247171

248172
class PresenceStream(Stream):
173+
PresenceStreamRow = namedtuple(
174+
"PresenceStreamRow",
175+
(
176+
"user_id", # str
177+
"state", # str
178+
"last_active_ts", # int
179+
"last_federation_update_ts", # int
180+
"last_user_sync_ts", # int
181+
"status_msg", # str
182+
"currently_active", # bool
183+
),
184+
)
185+
249186
NAME = "presence"
250187
ROW_TYPE = PresenceStreamRow
251188

@@ -260,6 +197,10 @@ def __init__(self, hs):
260197

261198

262199
class TypingStream(Stream):
200+
TypingStreamRow = namedtuple(
201+
"TypingStreamRow", ("room_id", "user_ids") # str # list(str)
202+
)
203+
263204
NAME = "typing"
264205
ROW_TYPE = TypingStreamRow
265206

@@ -273,6 +214,17 @@ def __init__(self, hs):
273214

274215

275216
class ReceiptsStream(Stream):
217+
ReceiptsStreamRow = namedtuple(
218+
"ReceiptsStreamRow",
219+
(
220+
"room_id", # str
221+
"receipt_type", # str
222+
"user_id", # str
223+
"event_id", # str
224+
"data", # dict
225+
),
226+
)
227+
276228
NAME = "receipts"
277229
ROW_TYPE = ReceiptsStreamRow
278230

@@ -289,6 +241,8 @@ class PushRulesStream(Stream):
289241
"""A user has changed their push rules
290242
"""
291243

244+
PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",)) # str
245+
292246
NAME = "push_rules"
293247
ROW_TYPE = PushRulesStreamRow
294248

@@ -309,6 +263,11 @@ class PushersStream(Stream):
309263
"""A user has added/changed/removed a pusher
310264
"""
311265

266+
PushersStreamRow = namedtuple(
267+
"PushersStreamRow",
268+
("user_id", "app_id", "pushkey", "deleted"), # str # str # str # bool
269+
)
270+
312271
NAME = "pushers"
313272
ROW_TYPE = PushersStreamRow
314273

@@ -326,6 +285,21 @@ class CachesStream(Stream):
326285
the cache on the workers
327286
"""
328287

288+
@attr.s
289+
class CachesStreamRow:
290+
"""Stream to inform workers they should invalidate their cache.
291+
292+
Attributes:
293+
cache_func: Name of the cached function.
294+
keys: The entry in the cache to invalidate. If None then will
295+
invalidate all.
296+
invalidation_ts: Timestamp of when the invalidation took place.
297+
"""
298+
299+
cache_func = attr.ib(type=str)
300+
keys = attr.ib(type=Optional[List[Any]])
301+
invalidation_ts = attr.ib(type=int)
302+
329303
NAME = "caches"
330304
ROW_TYPE = CachesStreamRow
331305

@@ -342,6 +316,16 @@ class PublicRoomsStream(Stream):
342316
"""The public rooms list changed
343317
"""
344318

319+
PublicRoomsStreamRow = namedtuple(
320+
"PublicRoomsStreamRow",
321+
(
322+
"room_id", # str
323+
"visibility", # str
324+
"appservice_id", # str, optional
325+
"network_id", # str, optional
326+
),
327+
)
328+
345329
NAME = "public_rooms"
346330
ROW_TYPE = PublicRoomsStreamRow
347331

@@ -359,6 +343,10 @@ class DeviceListsStream(Stream):
359343
told about a device update.
360344
"""
361345

346+
@attr.s
347+
class DeviceListsStreamRow:
348+
entity = attr.ib(type=str)
349+
362350
NAME = "device_lists"
363351
ROW_TYPE = DeviceListsStreamRow
364352

@@ -375,6 +363,8 @@ class ToDeviceStream(Stream):
375363
"""New to_device messages for a client
376364
"""
377365

366+
ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",)) # str
367+
378368
NAME = "to_device"
379369
ROW_TYPE = ToDeviceStreamRow
380370

@@ -391,6 +381,10 @@ class TagAccountDataStream(Stream):
391381
"""Someone added/removed a tag for a room
392382
"""
393383

384+
TagAccountDataStreamRow = namedtuple(
385+
"TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict
386+
)
387+
394388
NAME = "tag_account_data"
395389
ROW_TYPE = TagAccountDataStreamRow
396390

@@ -407,6 +401,10 @@ class AccountDataStream(Stream):
407401
"""Global or per room account data was changed
408402
"""
409403

404+
AccountDataStreamRow = namedtuple(
405+
"AccountDataStream", ("user_id", "room_id", "data_type") # str # str # str
406+
)
407+
410408
NAME = "account_data"
411409
ROW_TYPE = AccountDataStreamRow
412410

@@ -432,6 +430,11 @@ async def update_function(self, from_token, to_token, limit):
432430

433431

434432
class GroupServerStream(Stream):
433+
GroupsStreamRow = namedtuple(
434+
"GroupsStreamRow",
435+
("group_id", "user_id", "type", "content"), # str # str # str # dict
436+
)
437+
435438
NAME = "groups"
436439
ROW_TYPE = GroupsStreamRow
437440

@@ -448,6 +451,8 @@ class UserSignatureStream(Stream):
448451
"""A user has signed their own device with their user-signing key
449452
"""
450453

454+
UserSignatureStreamRow = namedtuple("UserSignatureStreamRow", ("user_id")) # str
455+
451456
NAME = "user_signature"
452457
ROW_TYPE = UserSignatureStreamRow
453458

synapse/replication/tcp/streams/federation.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,20 @@
1717

1818
from ._base import Stream
1919

20-
FederationStreamRow = namedtuple(
21-
"FederationStreamRow",
22-
(
23-
"type", # str, the type of data as defined in the BaseFederationRows
24-
"data", # dict, serialization of a federation.send_queue.BaseFederationRow
25-
),
26-
)
27-
2820

2921
class FederationStream(Stream):
3022
"""Data to be sent over federation. Only available when master has federation
3123
sending disabled.
3224
"""
3325

26+
FederationStreamRow = namedtuple(
27+
"FederationStreamRow",
28+
(
29+
"type", # str, the type of data as defined in the BaseFederationRows
30+
"data", # dict, serialization of a federation.send_queue.BaseFederationRow
31+
),
32+
)
33+
3434
NAME = "federation"
3535
ROW_TYPE = FederationStreamRow
3636

tests/replication/tcp/streams/test_receipts.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
15-
from synapse.replication.tcp.streams._base import ReceiptsStreamRow
15+
from synapse.replication.tcp.streams._base import ReceiptsStream
1616

1717
from tests.replication.tcp.streams._base import BaseStreamTestCase
1818

@@ -38,7 +38,7 @@ def test_receipt(self):
3838
rdata_rows = self.test_handler.received_rdata_rows
3939
self.assertEqual(1, len(rdata_rows))
4040
self.assertEqual(rdata_rows[0][0], "receipts")
41-
row = rdata_rows[0][2] # type: ReceiptsStreamRow
41+
row = rdata_rows[0][2] # type: ReceiptsStream.ReceiptsStreamRow
4242
self.assertEqual(ROOM_ID, row.room_id)
4343
self.assertEqual("m.read", row.receipt_type)
4444
self.assertEqual(USER_ID, row.user_id)

0 commit comments

Comments
 (0)