Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 46 additions & 26 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import contextlib
import logging
import sys
from typing import Iterable
from typing import Dict, Iterable

from typing_extensions import ContextManager

from twisted.internet import defer, reactor
from twisted.web.resource import NoResource
Expand Down Expand Up @@ -222,6 +224,13 @@ async def on_POST(self, request, device_id):
return 200, {"one_time_key_counts": result}


class _NullContextManager(ContextManager[None]):
"""A context manager which does nothing."""

def __exit__(self, exc_type, exc_val, exc_tb):
pass


UPDATE_SYNCING_USERS_MS = 10 * 1000


Expand All @@ -231,7 +240,13 @@ def __init__(self, hs):
self.hs = hs
self.is_mine_id = hs.is_mine_id
self.http_client = hs.get_simple_http_client()
self.user_to_num_current_syncs = {}

self._presence_enabled = hs.config.use_presence

# The number of ongoing syncs on this process, by user id.
# Empty if _presence_enabled is false.
self._user_to_num_current_syncs = {} # type: Dict[str, int]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a little confusing to me that GenericWorkerPresence has _user_to_num_current_syncs and PresenceHandler has user_to_num_current_syncs -- might be reasonable to abstract this? They also both use the code from the get_currently_syncing_users_for_replication method, but one inlines it? (I see that they need different implementations of that method...)


self.notifier = hs.get_notifier()
self.instance_id = hs.get_instance_id()

Expand All @@ -252,13 +267,13 @@ def __init__(self, hs):
)

def _on_shutdown(self):
if self.hs.config.use_presence:
if self._presence_enabled:
self.hs.get_tcp_replication().send_command(
ClearUserSyncsCommand(self.instance_id)
)

def send_user_sync(self, user_id, is_syncing, last_sync_ms):
if self.hs.config.use_presence:
if self._presence_enabled:
self.hs.get_tcp_replication().send_user_sync(
self.instance_id, user_id, is_syncing, last_sync_ms
)
Expand Down Expand Up @@ -300,24 +315,33 @@ def set_state(self, user, state, ignore_status_msg=False):
# TODO Hows this supposed to work?
return defer.succeed(None)

def user_syncing(self, user_id, affect_presence):
if affect_presence:
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
self.user_to_num_current_syncs[user_id] = curr_sync + 1
async def user_syncing(
self, user_id: str, affect_presence: bool
) -> ContextManager[None]:
"""Record that a user is syncing.

Called by the sync and events servlets to record that a user has connected to
this worker and is waiting for some events.
"""
if not affect_presence or not self._presence_enabled:
return _NullContextManager()

# If we went from no in flight sync to some, notify replication
if self.user_to_num_current_syncs[user_id] == 1:
self.mark_as_coming_online(user_id)
curr_sync = self._user_to_num_current_syncs.get(user_id, 0)
self._user_to_num_current_syncs[user_id] = curr_sync + 1
Comment on lines +329 to +330
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern usually pops out to me as a good opportunity to use defaultdict(int). It looks like it only occurs twice -- here and similar code in PresenceHandler though so not sure it would be worth it.


# If we went from no in flight sync to some, notify replication
if self._user_to_num_current_syncs[user_id] == 1:
self.mark_as_coming_online(user_id)

def _end():
# We check that the user_id is in user_to_num_current_syncs because
# user_to_num_current_syncs may have been cleared if we are
# shutting down.
if affect_presence and user_id in self.user_to_num_current_syncs:
self.user_to_num_current_syncs[user_id] -= 1
if user_id in self._user_to_num_current_syncs:
self._user_to_num_current_syncs[user_id] -= 1

# If we went from one in flight sync to non, notify replication
if self.user_to_num_current_syncs[user_id] == 0:
if self._user_to_num_current_syncs[user_id] == 0:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we don't remove the entry from _user_to_num_current_syncs at this point? Is it expected that they'll come back (and just cause churn if we remove it)? Removing them would simplify get_currently_syncing_users_for_replication a bit.

self.mark_as_going_offline(user_id)

@contextlib.contextmanager
Expand All @@ -327,7 +351,7 @@ def _user_syncing():
finally:
_end()

return defer.succeed(_user_syncing())
return _user_syncing()

@defer.inlineCallbacks
def notify_from_replication(self, states, stream_id):
Expand Down Expand Up @@ -362,15 +386,12 @@ def process_replication_rows(self, token, rows):
stream_id = token
yield self.notify_from_replication(states, stream_id)

def get_currently_syncing_users(self):
if self.hs.config.use_presence:
return [
user_id
for user_id, count in self.user_to_num_current_syncs.items()
if count > 0
]
else:
return set()
def get_currently_syncing_users(self) -> Set[str]:
return {
user_id
for user_id, count in self._user_to_num_current_syncs.items()
if count > 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the PresenceHandler code this is simply if count, might make sense to keep them consistent.

}


class GenericWorkerTyping(object):
Expand Down Expand Up @@ -612,8 +633,7 @@ def __init__(self, hs):

self.store = hs.get_datastore()
self.typing_handler = hs.get_typing_handler()
# NB this is a SynchrotronPresence, not a normal PresenceHandler
self.presence_handler = hs.get_presence_handler()
self.presence_handler = hs.get_presence_handler() # type: GenericWorkerPresence
self.notifier = hs.get_notifier()

self.notify_pushers = hs.config.start_pushers
Expand Down