Skip to content

Commit 5624c8b

Browse files
authored
In sync wait for worker to catch up since token (#17215)
Otherwise things will get confused. An alternative would be to make sure that for lagging stream we don't return anything (and make sure the returned next_batch token doesn't go backwards). But that is a faff.
1 parent 4e3868d commit 5624c8b

File tree

7 files changed

+134
-7
lines changed

7 files changed

+134
-7
lines changed

changelog.d/17215.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix bug where duplicate events could be sent down sync when using workers that are overloaded.

pyproject.toml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -200,10 +200,8 @@ netaddr = ">=0.7.18"
200200
# add a lower bound to the Jinja2 dependency.
201201
Jinja2 = ">=3.0"
202202
bleach = ">=1.4.3"
203-
# We use `ParamSpec` and `Concatenate`, which were added in `typing-extensions` 3.10.0.0.
204-
# Additionally we need https://github.com/python/typing/pull/817 to allow types to be
205-
# generic over ParamSpecs.
206-
typing-extensions = ">=3.10.0.1"
203+
# We use `Self`, which were added in `typing-extensions` 4.0.
204+
typing-extensions = ">=4.0"
207205
# We enforce that we have a `cryptography` version that bundles an `openssl`
208206
# with the latest security patches.
209207
cryptography = ">=3.4.7"

synapse/handlers/sync.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,23 @@ def __bool__(self) -> bool:
284284
or self.device_lists
285285
)
286286

287+
@staticmethod
288+
def empty(next_batch: StreamToken) -> "SyncResult":
289+
"Return a new empty result"
290+
return SyncResult(
291+
next_batch=next_batch,
292+
presence=[],
293+
account_data=[],
294+
joined=[],
295+
invited=[],
296+
knocked=[],
297+
archived=[],
298+
to_device=[],
299+
device_lists=DeviceListUpdates(),
300+
device_one_time_keys_count={},
301+
device_unused_fallback_key_types=[],
302+
)
303+
287304

288305
@attr.s(slots=True, frozen=True, auto_attribs=True)
289306
class E2eeSyncResult:
@@ -497,6 +514,24 @@ async def _wait_for_sync_for_user(
497514
if context:
498515
context.tag = sync_label
499516

517+
if since_token is not None:
518+
# We need to make sure this worker has caught up with the token. If
519+
# this returns false it means we timed out waiting, and we should
520+
# just return an empty response.
521+
start = self.clock.time_msec()
522+
if not await self.notifier.wait_for_stream_token(since_token):
523+
logger.warning(
524+
"Timed out waiting for worker to catch up. Returning empty response"
525+
)
526+
return SyncResult.empty(since_token)
527+
528+
# If we've spent significant time waiting to catch up, take it off
529+
# the timeout.
530+
now = self.clock.time_msec()
531+
if now - start > 1_000:
532+
timeout -= now - start
533+
timeout = max(timeout, 0)
534+
500535
# if we have a since token, delete any to-device messages before that token
501536
# (since we now know that the device has received them)
502537
if since_token is not None:

synapse/notifier.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -763,6 +763,29 @@ async def check_for_updates(
763763

764764
return result
765765

766+
async def wait_for_stream_token(self, stream_token: StreamToken) -> bool:
767+
"""Wait for this worker to catch up with the given stream token."""
768+
769+
start = self.clock.time_msec()
770+
while True:
771+
current_token = self.event_sources.get_current_token()
772+
if stream_token.is_before_or_eq(current_token):
773+
return True
774+
775+
now = self.clock.time_msec()
776+
777+
if now - start > 10_000:
778+
return False
779+
780+
logger.info(
781+
"Waiting for current token to reach %s; currently at %s",
782+
stream_token,
783+
current_token,
784+
)
785+
786+
# TODO: be better
787+
await self.clock.sleep(0.5)
788+
766789
async def _get_room_ids(
767790
self, user: UserID, explicit_room_id: Optional[str]
768791
) -> Tuple[StrCollection, bool]:

synapse/storage/databases/main/events.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ class DeltaState:
9595
to_insert: StateMap[str]
9696
no_longer_in_room: bool = False
9797

98+
def is_noop(self) -> bool:
99+
"""Whether this state delta is actually empty"""
100+
return not self.to_delete and not self.to_insert and not self.no_longer_in_room
101+
98102

99103
class PersistEventsStore:
100104
"""Contains all the functions for writing events to the database.
@@ -1017,6 +1021,9 @@ async def update_current_state(
10171021
) -> None:
10181022
"""Update the current state stored in the datatabase for the given room"""
10191023

1024+
if state_delta.is_noop():
1025+
return
1026+
10201027
async with self._stream_id_gen.get_next() as stream_ordering:
10211028
await self.db_pool.runInteraction(
10221029
"update_current_state",

synapse/storage/databases/main/events_worker.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,11 @@ def __init__(
200200
notifier=hs.get_replication_notifier(),
201201
stream_name="events",
202202
instance_name=hs.get_instance_name(),
203-
tables=[("events", "instance_name", "stream_ordering")],
203+
tables=[
204+
("events", "instance_name", "stream_ordering"),
205+
("current_state_delta_stream", "instance_name", "stream_id"),
206+
("ex_outlier_stream", "instance_name", "event_stream_ordering"),
207+
],
204208
sequence_name="events_stream_seq",
205209
writers=hs.config.worker.writers.events,
206210
)
@@ -210,7 +214,10 @@ def __init__(
210214
notifier=hs.get_replication_notifier(),
211215
stream_name="backfill",
212216
instance_name=hs.get_instance_name(),
213-
tables=[("events", "instance_name", "stream_ordering")],
217+
tables=[
218+
("events", "instance_name", "stream_ordering"),
219+
("ex_outlier_stream", "instance_name", "event_stream_ordering"),
220+
],
214221
sequence_name="events_backfill_stream_seq",
215222
positive=False,
216223
writers=hs.config.worker.writers.events,

synapse/types/__init__.py

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
from immutabledict import immutabledict
4949
from signedjson.key import decode_verify_key_bytes
5050
from signedjson.types import VerifyKey
51-
from typing_extensions import TypedDict
51+
from typing_extensions import Self, TypedDict
5252
from unpaddedbase64 import decode_base64
5353
from zope.interface import Interface
5454

@@ -515,6 +515,27 @@ def get_stream_pos_for_instance(self, instance_name: str) -> int:
515515
# at `self.stream`.
516516
return self.instance_map.get(instance_name, self.stream)
517517

518+
def is_before_or_eq(self, other_token: Self) -> bool:
519+
"""Wether this token is before the other token, i.e. every constituent
520+
part is before the other.
521+
522+
Essentially it is `self <= other`.
523+
524+
Note: if `self.is_before_or_eq(other_token) is False` then that does not
525+
imply that the reverse is True.
526+
"""
527+
if self.stream > other_token.stream:
528+
return False
529+
530+
instances = self.instance_map.keys() | other_token.instance_map.keys()
531+
for instance in instances:
532+
if self.instance_map.get(
533+
instance, self.stream
534+
) > other_token.instance_map.get(instance, other_token.stream):
535+
return False
536+
537+
return True
538+
518539

519540
@attr.s(frozen=True, slots=True, order=False)
520541
class RoomStreamToken(AbstractMultiWriterStreamToken):
@@ -1008,6 +1029,41 @@ def get_field(
10081029
"""Returns the stream ID for the given key."""
10091030
return getattr(self, key.value)
10101031

1032+
def is_before_or_eq(self, other_token: "StreamToken") -> bool:
1033+
"""Wether this token is before the other token, i.e. every constituent
1034+
part is before the other.
1035+
1036+
Essentially it is `self <= other`.
1037+
1038+
Note: if `self.is_before_or_eq(other_token) is False` then that does not
1039+
imply that the reverse is True.
1040+
"""
1041+
1042+
for _, key in StreamKeyType.__members__.items():
1043+
if key == StreamKeyType.TYPING:
1044+
# Typing stream is allowed to "reset", and so comparisons don't
1045+
# really make sense as is.
1046+
# TODO: Figure out a better way of tracking resets.
1047+
continue
1048+
1049+
self_value = self.get_field(key)
1050+
other_value = other_token.get_field(key)
1051+
1052+
if isinstance(self_value, RoomStreamToken):
1053+
assert isinstance(other_value, RoomStreamToken)
1054+
if not self_value.is_before_or_eq(other_value):
1055+
return False
1056+
elif isinstance(self_value, MultiWriterStreamToken):
1057+
assert isinstance(other_value, MultiWriterStreamToken)
1058+
if not self_value.is_before_or_eq(other_value):
1059+
return False
1060+
else:
1061+
assert isinstance(other_value, int)
1062+
if self_value > other_value:
1063+
return False
1064+
1065+
return True
1066+
10111067

10121068
StreamToken.START = StreamToken(
10131069
RoomStreamToken(stream=0), 0, 0, MultiWriterStreamToken(stream=0), 0, 0, 0, 0, 0, 0

0 commit comments

Comments
 (0)