Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 71a1abb

Browse files
authored
Stop the master relaying USER_SYNC for other workers (#7318)
Long story short: if we're handling presence on the current worker, we shouldn't be sending USER_SYNC commands over replication. In an attempt to figure out what is going on here, I ended up refactoring some bits of the presencehandler code, so the first 4 commits here are non-functional refactors to move this code slightly closer to sanity. (There's still plenty to do here :/). Suggest reviewing individual commits. Fixes (I hope) #7257.
1 parent 841c581 commit 71a1abb

File tree

10 files changed

+199
-159
lines changed

10 files changed

+199
-159
lines changed

changelog.d/7318.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Move catchup of replication streams logic to worker.

docs/tcp_replication.md

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ Asks the server for the current position of all streams.
196196

197197
#### USER_SYNC (C)
198198

199-
A user has started or stopped syncing
199+
A user has started or stopped syncing on this process.
200200

201201
#### CLEAR_USER_SYNC (C)
202202

@@ -216,10 +216,6 @@ Asks the server for the current position of all streams.
216216

217217
Inform the server a cache should be invalidated
218218

219-
#### SYNC (S, C)
220-
221-
Used exclusively in tests
222-
223219
### REMOTE_SERVER_UP (S, C)
224220

225221
Inform other processes that a remote server may have come back online.

synapse/api/constants.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ class EventTypes(object):
9797

9898
Retention = "m.room.retention"
9999

100+
Presence = "m.presence"
101+
100102

101103
class RejectedReason(object):
102104
AUTH_ERROR = "auth_error"

synapse/app/generic_worker.py

Lines changed: 49 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
import contextlib
1818
import logging
1919
import sys
20+
from typing import Dict, Iterable
21+
22+
from typing_extensions import ContextManager
2023

2124
from twisted.internet import defer, reactor
2225
from twisted.web.resource import NoResource
@@ -38,14 +41,14 @@
3841
from synapse.config.logger import setup_logging
3942
from synapse.federation import send_queue
4043
from synapse.federation.transport.server import TransportLayerServer
41-
from synapse.handlers.presence import PresenceHandler, get_interested_parties
44+
from synapse.handlers.presence import BasePresenceHandler, get_interested_parties
4245
from synapse.http.server import JsonResource
4346
from synapse.http.servlet import RestServlet, parse_json_object_from_request
4447
from synapse.http.site import SynapseSite
4548
from synapse.logging.context import LoggingContext
4649
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
4750
from synapse.metrics.background_process_metrics import run_as_background_process
48-
from synapse.replication.slave.storage._base import BaseSlavedStore, __func__
51+
from synapse.replication.slave.storage._base import BaseSlavedStore
4952
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
5053
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
5154
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
@@ -225,23 +228,32 @@ async def on_POST(self, request, device_id):
225228
return 200, {"one_time_key_counts": result}
226229

227230

231+
class _NullContextManager(ContextManager[None]):
232+
"""A context manager which does nothing."""
233+
234+
def __exit__(self, exc_type, exc_val, exc_tb):
235+
pass
236+
237+
228238
UPDATE_SYNCING_USERS_MS = 10 * 1000
229239

230240

231-
class GenericWorkerPresence(object):
241+
class GenericWorkerPresence(BasePresenceHandler):
232242
def __init__(self, hs):
243+
super().__init__(hs)
233244
self.hs = hs
234245
self.is_mine_id = hs.is_mine_id
235246
self.http_client = hs.get_simple_http_client()
236-
self.store = hs.get_datastore()
237-
self.user_to_num_current_syncs = {}
238-
self.clock = hs.get_clock()
247+
248+
self._presence_enabled = hs.config.use_presence
249+
250+
# The number of ongoing syncs on this process, by user id.
251+
# Empty if _presence_enabled is false.
252+
self._user_to_num_current_syncs = {} # type: Dict[str, int]
253+
239254
self.notifier = hs.get_notifier()
240255
self.instance_id = hs.get_instance_id()
241256

242-
active_presence = self.store.take_presence_startup_info()
243-
self.user_to_current_state = {state.user_id: state for state in active_presence}
244-
245257
# user_id -> last_sync_ms. Lists the users that have stopped syncing
246258
# but we haven't notified the master of that yet
247259
self.users_going_offline = {}
@@ -259,13 +271,13 @@ def __init__(self, hs):
259271
)
260272

261273
def _on_shutdown(self):
262-
if self.hs.config.use_presence:
274+
if self._presence_enabled:
263275
self.hs.get_tcp_replication().send_command(
264276
ClearUserSyncsCommand(self.instance_id)
265277
)
266278

267279
def send_user_sync(self, user_id, is_syncing, last_sync_ms):
268-
if self.hs.config.use_presence:
280+
if self._presence_enabled:
269281
self.hs.get_tcp_replication().send_user_sync(
270282
self.instance_id, user_id, is_syncing, last_sync_ms
271283
)
@@ -307,28 +319,33 @@ def set_state(self, user, state, ignore_status_msg=False):
307319
# TODO Hows this supposed to work?
308320
return defer.succeed(None)
309321

310-
get_states = __func__(PresenceHandler.get_states)
311-
get_state = __func__(PresenceHandler.get_state)
312-
current_state_for_users = __func__(PresenceHandler.current_state_for_users)
322+
async def user_syncing(
323+
self, user_id: str, affect_presence: bool
324+
) -> ContextManager[None]:
325+
"""Record that a user is syncing.
326+
327+
Called by the sync and events servlets to record that a user has connected to
328+
this worker and is waiting for some events.
329+
"""
330+
if not affect_presence or not self._presence_enabled:
331+
return _NullContextManager()
313332

314-
def user_syncing(self, user_id, affect_presence):
315-
if affect_presence:
316-
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
317-
self.user_to_num_current_syncs[user_id] = curr_sync + 1
333+
curr_sync = self._user_to_num_current_syncs.get(user_id, 0)
334+
self._user_to_num_current_syncs[user_id] = curr_sync + 1
318335

319-
# If we went from no in flight sync to some, notify replication
320-
if self.user_to_num_current_syncs[user_id] == 1:
321-
self.mark_as_coming_online(user_id)
336+
# If we went from no in flight sync to some, notify replication
337+
if self._user_to_num_current_syncs[user_id] == 1:
338+
self.mark_as_coming_online(user_id)
322339

323340
def _end():
324341
# We check that the user_id is in user_to_num_current_syncs because
325342
# user_to_num_current_syncs may have been cleared if we are
326343
# shutting down.
327-
if affect_presence and user_id in self.user_to_num_current_syncs:
328-
self.user_to_num_current_syncs[user_id] -= 1
344+
if user_id in self._user_to_num_current_syncs:
345+
self._user_to_num_current_syncs[user_id] -= 1
329346

330347
# If we went from one in flight sync to non, notify replication
331-
if self.user_to_num_current_syncs[user_id] == 0:
348+
if self._user_to_num_current_syncs[user_id] == 0:
332349
self.mark_as_going_offline(user_id)
333350

334351
@contextlib.contextmanager
@@ -338,7 +355,7 @@ def _user_syncing():
338355
finally:
339356
_end()
340357

341-
return defer.succeed(_user_syncing())
358+
return _user_syncing()
342359

343360
@defer.inlineCallbacks
344361
def notify_from_replication(self, states, stream_id):
@@ -373,15 +390,12 @@ def process_replication_rows(self, token, rows):
373390
stream_id = token
374391
yield self.notify_from_replication(states, stream_id)
375392

376-
def get_currently_syncing_users(self):
377-
if self.hs.config.use_presence:
378-
return [
379-
user_id
380-
for user_id, count in self.user_to_num_current_syncs.items()
381-
if count > 0
382-
]
383-
else:
384-
return set()
393+
def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
394+
return [
395+
user_id
396+
for user_id, count in self._user_to_num_current_syncs.items()
397+
if count > 0
398+
]
385399

386400

387401
class GenericWorkerTyping(object):
@@ -625,8 +639,7 @@ def __init__(self, hs):
625639

626640
self.store = hs.get_datastore()
627641
self.typing_handler = hs.get_typing_handler()
628-
# NB this is a SynchrotronPresence, not a normal PresenceHandler
629-
self.presence_handler = hs.get_presence_handler()
642+
self.presence_handler = hs.get_presence_handler() # type: GenericWorkerPresence
630643
self.notifier = hs.get_notifier()
631644

632645
self.notify_pushers = hs.config.start_pushers

synapse/handlers/events.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from synapse.api.constants import EventTypes, Membership
2020
from synapse.api.errors import AuthError, SynapseError
2121
from synapse.events import EventBase
22+
from synapse.handlers.presence import format_user_presence_state
2223
from synapse.logging.utils import log_function
2324
from synapse.types import UserID
2425
from synapse.visibility import filter_events_for_client
@@ -97,6 +98,8 @@ async def get_stream(
9798
explicit_room_id=room_id,
9899
)
99100

101+
time_now = self.clock.time_msec()
102+
100103
# When the user joins a new room, or another user joins a currently
101104
# joined room, we need to send down presence for those users.
102105
to_add = []
@@ -112,19 +115,20 @@ async def get_stream(
112115
users = await self.state.get_current_users_in_room(
113116
event.room_id
114117
)
115-
states = await presence_handler.get_states(users, as_event=True)
116-
to_add.extend(states)
117118
else:
119+
users = [event.state_key]
118120

119-
ev = await presence_handler.get_state(
120-
UserID.from_string(event.state_key), as_event=True
121-
)
122-
to_add.append(ev)
121+
states = await presence_handler.get_states(users)
122+
to_add.extend(
123+
{
124+
"type": EventTypes.Presence,
125+
"content": format_user_presence_state(state, time_now),
126+
}
127+
for state in states
128+
)
123129

124130
events.extend(to_add)
125131

126-
time_now = self.clock.time_msec()
127-
128132
chunks = await self._event_serializer.serialize_events(
129133
events,
130134
time_now,

synapse/handlers/initial_sync.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -381,10 +381,16 @@ async def get_presence():
381381
return []
382382

383383
states = await presence_handler.get_states(
384-
[m.user_id for m in room_members], as_event=True
384+
[m.user_id for m in room_members]
385385
)
386386

387-
return states
387+
return [
388+
{
389+
"type": EventTypes.Presence,
390+
"content": format_user_presence_state(s, time_now),
391+
}
392+
for s in states
393+
]
388394

389395
async def get_receipts():
390396
receipts = await self.store.get_linearized_receipts_for_room(

0 commit comments

Comments
 (0)