Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 4f21c33

Browse files
authored
Remove usage of "conn_id" for presence. (#7128)
* Remove `conn_id` usage for UserSyncCommand. Each tcp replication connection is assigned a "conn_id", which is used to give an ID to a remotely connected worker. In a redis world, there will no longer be a one to one mapping between connection and instance, so instead we need to replace such usages with an ID generated by the remote instances and included in the replicaiton commands. This really only effects UserSyncCommand. * Add CLEAR_USER_SYNCS command that is sent on shutdown. This should help with the case where a synchrotron gets restarted gracefully, rather than rely on 5 minute timeout.
1 parent 07569f2 commit 4f21c33

File tree

9 files changed

+86
-22
lines changed

9 files changed

+86
-22
lines changed

changelog.d/7128.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add explicit `instance_id` for USER_SYNC commands and remove implicit `conn_id` usage.

docs/tcp_replication.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,12 @@ Asks the server for the current position of all streams.
198198

199199
A user has started or stopped syncing
200200

201+
#### CLEAR_USER_SYNC (C)
202+
203+
The server should clear all associated user sync data from the worker.
204+
205+
This is used when a worker is shutting down.
206+
201207
#### FEDERATION_ACK (C)
202208

203209
Acknowledge receipt of some federation data

synapse/app/generic_worker.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
from synapse.replication.slave.storage.room import RoomStore
6666
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
6767
from synapse.replication.tcp.client import ReplicationClientHandler
68+
from synapse.replication.tcp.commands import ClearUserSyncsCommand
6869
from synapse.replication.tcp.streams import (
6970
AccountDataStream,
7071
DeviceListsStream,
@@ -124,7 +125,6 @@
124125
from synapse.util.async_helpers import Linearizer
125126
from synapse.util.httpresourcetree import create_resource_tree
126127
from synapse.util.manhole import manhole
127-
from synapse.util.stringutils import random_string
128128
from synapse.util.versionstring import get_version_string
129129

130130
logger = logging.getLogger("synapse.app.generic_worker")
@@ -233,6 +233,7 @@ def __init__(self, hs):
233233
self.user_to_num_current_syncs = {}
234234
self.clock = hs.get_clock()
235235
self.notifier = hs.get_notifier()
236+
self.instance_id = hs.get_instance_id()
236237

237238
active_presence = self.store.take_presence_startup_info()
238239
self.user_to_current_state = {state.user_id: state for state in active_presence}
@@ -245,13 +246,24 @@ def __init__(self, hs):
245246
self.send_stop_syncing, UPDATE_SYNCING_USERS_MS
246247
)
247248

248-
self.process_id = random_string(16)
249-
logger.info("Presence process_id is %r", self.process_id)
249+
hs.get_reactor().addSystemEventTrigger(
250+
"before",
251+
"shutdown",
252+
run_as_background_process,
253+
"generic_presence.on_shutdown",
254+
self._on_shutdown,
255+
)
256+
257+
def _on_shutdown(self):
258+
if self.hs.config.use_presence:
259+
self.hs.get_tcp_replication().send_command(
260+
ClearUserSyncsCommand(self.instance_id)
261+
)
250262

251263
def send_user_sync(self, user_id, is_syncing, last_sync_ms):
252264
if self.hs.config.use_presence:
253265
self.hs.get_tcp_replication().send_user_sync(
254-
user_id, is_syncing, last_sync_ms
266+
self.instance_id, user_id, is_syncing, last_sync_ms
255267
)
256268

257269
def mark_as_coming_online(self, user_id):

synapse/replication/tcp/client.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,10 +189,12 @@ def send_federation_ack(self, token):
189189
"""
190190
self.send_command(FederationAckCommand(token))
191191

192-
def send_user_sync(self, user_id, is_syncing, last_sync_ms):
192+
def send_user_sync(self, instance_id, user_id, is_syncing, last_sync_ms):
193193
"""Poke the master that a user has started/stopped syncing.
194194
"""
195-
self.send_command(UserSyncCommand(user_id, is_syncing, last_sync_ms))
195+
self.send_command(
196+
UserSyncCommand(instance_id, user_id, is_syncing, last_sync_ms)
197+
)
196198

197199
def send_remove_pusher(self, app_id, push_key, user_id):
198200
"""Poke the master to remove a pusher for a user

synapse/replication/tcp/commands.py

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -207,37 +207,63 @@ class UserSyncCommand(Command):
207207
208208
Format::
209209
210-
USER_SYNC <user_id> <state> <last_sync_ms>
210+
USER_SYNC <instance_id> <user_id> <state> <last_sync_ms>
211211
212212
Where <state> is either "start" or "stop"
213213
"""
214214

215215
NAME = "USER_SYNC"
216216

217-
def __init__(self, user_id, is_syncing, last_sync_ms):
217+
def __init__(self, instance_id, user_id, is_syncing, last_sync_ms):
218+
self.instance_id = instance_id
218219
self.user_id = user_id
219220
self.is_syncing = is_syncing
220221
self.last_sync_ms = last_sync_ms
221222

222223
@classmethod
223224
def from_line(cls, line):
224-
user_id, state, last_sync_ms = line.split(" ", 2)
225+
instance_id, user_id, state, last_sync_ms = line.split(" ", 3)
225226

226227
if state not in ("start", "end"):
227228
raise Exception("Invalid USER_SYNC state %r" % (state,))
228229

229-
return cls(user_id, state == "start", int(last_sync_ms))
230+
return cls(instance_id, user_id, state == "start", int(last_sync_ms))
230231

231232
def to_line(self):
232233
return " ".join(
233234
(
235+
self.instance_id,
234236
self.user_id,
235237
"start" if self.is_syncing else "end",
236238
str(self.last_sync_ms),
237239
)
238240
)
239241

240242

243+
class ClearUserSyncsCommand(Command):
244+
"""Sent by the client to inform the server that it should drop all
245+
information about syncing users sent by the client.
246+
247+
Mainly used when client is about to shut down.
248+
249+
Format::
250+
251+
CLEAR_USER_SYNC <instance_id>
252+
"""
253+
254+
NAME = "CLEAR_USER_SYNC"
255+
256+
def __init__(self, instance_id):
257+
self.instance_id = instance_id
258+
259+
@classmethod
260+
def from_line(cls, line):
261+
return cls(line)
262+
263+
def to_line(self):
264+
return self.instance_id
265+
266+
241267
class FederationAckCommand(Command):
242268
"""Sent by the client when it has processed up to a given point in the
243269
federation stream. This allows the master to drop in-memory caches of the
@@ -398,6 +424,7 @@ class RemoteServerUpCommand(Command):
398424
InvalidateCacheCommand,
399425
UserIpCommand,
400426
RemoteServerUpCommand,
427+
ClearUserSyncsCommand,
401428
) # type: Tuple[Type[Command], ...]
402429

403430
# Map of command name to command type.
@@ -420,6 +447,7 @@ class RemoteServerUpCommand(Command):
420447
ReplicateCommand.NAME,
421448
PingCommand.NAME,
422449
UserSyncCommand.NAME,
450+
ClearUserSyncsCommand.NAME,
423451
FederationAckCommand.NAME,
424452
RemovePusherCommand.NAME,
425453
InvalidateCacheCommand.NAME,

synapse/replication/tcp/protocol.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -423,9 +423,12 @@ async def on_NAME(self, cmd):
423423

424424
async def on_USER_SYNC(self, cmd):
425425
await self.streamer.on_user_sync(
426-
self.conn_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms
426+
cmd.instance_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms
427427
)
428428

429+
async def on_CLEAR_USER_SYNC(self, cmd):
430+
await self.streamer.on_clear_user_syncs(cmd.instance_id)
431+
429432
async def on_REPLICATE(self, cmd):
430433
# Subscribe to all streams we're publishing to.
431434
for stream_name in self.streamer.streams_by_name:
@@ -551,6 +554,8 @@ def __init__(
551554
):
552555
BaseReplicationStreamProtocol.__init__(self, clock)
553556

557+
self.instance_id = hs.get_instance_id()
558+
554559
self.client_name = client_name
555560
self.server_name = server_name
556561
self.handler = handler
@@ -580,7 +585,7 @@ def connectionMade(self):
580585
currently_syncing = self.handler.get_currently_syncing_users()
581586
now = self.clock.time_msec()
582587
for user_id in currently_syncing:
583-
self.send_command(UserSyncCommand(user_id, True, now))
588+
self.send_command(UserSyncCommand(self.instance_id, user_id, True, now))
584589

585590
# We've now finished connecting to so inform the client handler
586591
self.handler.update_connection(self)

synapse/replication/tcp/resource.py

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -251,14 +251,19 @@ def federation_ack(self, token):
251251
self.federation_sender.federation_ack(token)
252252

253253
@measure_func("repl.on_user_sync")
254-
async def on_user_sync(self, conn_id, user_id, is_syncing, last_sync_ms):
254+
async def on_user_sync(self, instance_id, user_id, is_syncing, last_sync_ms):
255255
"""A client has started/stopped syncing on a worker.
256256
"""
257257
user_sync_counter.inc()
258258
await self.presence_handler.update_external_syncs_row(
259-
conn_id, user_id, is_syncing, last_sync_ms
259+
instance_id, user_id, is_syncing, last_sync_ms
260260
)
261261

262+
async def on_clear_user_syncs(self, instance_id):
263+
"""A replication client wants us to drop all their UserSync data.
264+
"""
265+
await self.presence_handler.update_external_syncs_clear(instance_id)
266+
262267
@measure_func("repl.on_remove_pusher")
263268
async def on_remove_pusher(self, app_id, push_key, user_id):
264269
"""A client has asked us to remove a pusher
@@ -321,14 +326,6 @@ def lost_connection(self, connection):
321326
except ValueError:
322327
pass
323328

324-
# We need to tell the presence handler that the connection has been
325-
# lost so that it can handle any ongoing syncs on that connection.
326-
run_as_background_process(
327-
"update_external_syncs_clear",
328-
self.presence_handler.update_external_syncs_clear,
329-
connection.conn_id,
330-
)
331-
332329

333330
def _batch_updates(updates):
334331
"""Takes a list of updates of form [(token, row)] and sets the token to

synapse/server.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@
103103
from synapse.streams.events import EventSources
104104
from synapse.util import Clock
105105
from synapse.util.distributor import Distributor
106+
from synapse.util.stringutils import random_string
106107

107108
logger = logging.getLogger(__name__)
108109

@@ -230,6 +231,8 @@ def __init__(self, hostname: str, config: HomeServerConfig, reactor=None, **kwar
230231
self._listening_services = []
231232
self.start_time = None
232233

234+
self.instance_id = random_string(5)
235+
233236
self.clock = Clock(reactor)
234237
self.distributor = Distributor()
235238
self.ratelimiter = Ratelimiter()
@@ -242,6 +245,14 @@ def __init__(self, hostname: str, config: HomeServerConfig, reactor=None, **kwar
242245
for depname in kwargs:
243246
setattr(self, depname, kwargs[depname])
244247

248+
def get_instance_id(self):
249+
"""A unique ID for this synapse process instance.
250+
251+
This is used to distinguish running instances in worker-based
252+
deployments.
253+
"""
254+
return self.instance_id
255+
245256
def setup(self):
246257
logger.info("Setting up.")
247258
self.start_time = int(self.get_clock().time())

synapse/server.pyi

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,3 +114,5 @@ class HomeServer(object):
114114
pass
115115
def is_mine_id(self, domain_id: str) -> bool:
116116
pass
117+
def get_instance_id(self) -> str:
118+
pass

0 commit comments

Comments
 (0)