Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
f9e6e53
Configurable /sync/e2ee endpoint
MadLittleMods May 6, 2024
1e05a05
Add Sliding Sync `/sync/e2ee` endpoint for To-Device messages
MadLittleMods May 7, 2024
5e925f6
Share tests with test_sendtodevice
MadLittleMods May 8, 2024
69f9143
Comment on tests
MadLittleMods May 8, 2024
d4ff933
Prefer Sync v2 vs Sliding Sync distinction
MadLittleMods May 8, 2024
371ec57
Fix wait_for_sync_for_user in tests
MadLittleMods May 8, 2024
06d12e5
Ugly overloads
MadLittleMods May 8, 2024
b8b70ba
Fix lint
MadLittleMods May 8, 2024
c60a4f8
Add changelog
MadLittleMods May 8, 2024
10ffae6
Shared logic for `get_sync_result_builder()`
MadLittleMods May 8, 2024
6bf4896
Try calculate more
MadLittleMods May 9, 2024
8871dac
Share tests using inheritance
MadLittleMods May 9, 2024
0892283
Add comments docs
MadLittleMods May 9, 2024
adb7e20
Consolidate device_lists /sync tests
MadLittleMods May 9, 2024
f098355
Add `device_one_time_keys_count` tests
MadLittleMods May 9, 2024
6b7cfd7
Add tests for `device_unused_fallback_key_types` in `/sync`
MadLittleMods May 9, 2024
b9e5379
Describe test
MadLittleMods May 9, 2024
9bdfa16
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-e2ee
MadLittleMods May 16, 2024
7331401
Lint
MadLittleMods May 16, 2024
b23abca
Fix test inheritance
MadLittleMods May 16, 2024
821a1b3
Add missing field to docstring
MadLittleMods May 16, 2024
35ca937
Format docstring
MadLittleMods May 16, 2024
4ad7a8b
No need to change this formatting from develop
MadLittleMods May 16, 2024
3092ab5
Calculate room derived membership info for device_lists
MadLittleMods May 20, 2024
3539abe
Membership in timeline for better derived info
MadLittleMods May 20, 2024
5f194f9
Exclude application services
MadLittleMods May 20, 2024
02cecfa
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-e2ee
MadLittleMods May 20, 2024
f6122ff
Use `client_patterns()` for endpoint URL
MadLittleMods May 21, 2024
2f112e7
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-e2ee
MadLittleMods May 21, 2024
c2221bb
Lint
MadLittleMods May 21, 2024
717b160
Adjust wording, add todo
MadLittleMods May 21, 2024
514aba5
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-e2ee
MadLittleMods May 22, 2024
9749795
Update filter to be more precise and avoid more work
MadLittleMods May 22, 2024
06ac1da
Restore copyright header
MadLittleMods May 22, 2024
3da6bc1
Use `@parameterized_class`
MadLittleMods May 22, 2024
d4b41aa
Fix lints
MadLittleMods May 22, 2024
6606ac1
Add docstring for parametized attributes
MadLittleMods May 23, 2024
ab0b844
Add actual typing for params (not just docstrings)
MadLittleMods May 23, 2024
a482545
Fix test after removing type ignore
MadLittleMods May 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,9 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
# MSC3391: Removing account data.
self.msc3391_enabled = experimental.get("msc3391_enabled", False)

# MSC3575 (Sliding Sync API endpoints)
self.msc3575_enabled: bool = experimental.get("msc3575_enabled", False)

# MSC3773: Thread notifications
self.msc3773_enabled: bool = experimental.get("msc3773_enabled", False)

Expand Down
114 changes: 105 additions & 9 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#
import itertools
import logging
from enum import Enum
from typing import (
TYPE_CHECKING,
AbstractSet,
Expand Down Expand Up @@ -112,12 +113,30 @@
SyncRequestKey = Tuple[Any, ...]


class SyncVersion(Enum):
"""
Enum for specifying the version of sync request. This is used to key which type of
sync response that we are generating.

This is different than the `sync_type` you might see used in other code below; which
specifies the sub-type sync request (e.g. initial_sync, full_state_sync,
incremental_sync) and is really only relevant for the `/sync` v2 endpoint.
"""

# These string values are semantically significant because they are used in the the
# metrics

# Traditional `/sync` endpoint
SYNC_V2 = "sync_v2"
# Part of MSC3575 Sliding Sync
E2EE_SYNC = "e2ee_sync"


@attr.s(slots=True, frozen=True, auto_attribs=True)
class SyncConfig:
user: UserID
filter_collection: FilterCollection
is_guest: bool
request_key: SyncRequestKey
device_id: Optional[str]


Expand Down Expand Up @@ -263,6 +282,15 @@ def __bool__(self) -> bool:
)


@attr.s(slots=True, frozen=True, auto_attribs=True)
class E2eeSyncResult:
next_batch: StreamToken
to_device: List[JsonDict]
# device_lists: DeviceListUpdates
# device_one_time_keys_count: JsonMapping
# device_unused_fallback_key_types: List[str]


class SyncHandler:
def __init__(self, hs: "HomeServer"):
self.hs_config = hs.config
Expand Down Expand Up @@ -309,13 +337,18 @@ async def wait_for_sync_for_user(
self,
requester: Requester,
sync_config: SyncConfig,
sync_version: SyncVersion,
request_key: SyncRequestKey,
since_token: Optional[StreamToken] = None,
timeout: int = 0,
full_state: bool = False,
) -> SyncResult:
"""Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result.

Args:
request_key: The key to use for caching the response.
"""
# If the user is not part of the mau group, then check that limits have
# not been exceeded (if not part of the group by this point, almost certain
Expand All @@ -324,9 +357,10 @@ async def wait_for_sync_for_user(
await self.auth_blocking.check_auth_blocking(requester=requester)

res = await self.response_cache.wrap(
sync_config.request_key,
request_key,
self._wait_for_sync_for_user,
sync_config,
sync_version,
since_token,
timeout,
full_state,
Expand All @@ -338,6 +372,7 @@ async def wait_for_sync_for_user(
async def _wait_for_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: SyncVersion,
since_token: Optional[StreamToken],
timeout: int,
full_state: bool,
Expand All @@ -363,9 +398,11 @@ async def _wait_for_sync_for_user(
else:
sync_type = "incremental_sync"

sync_label = f"{sync_version}:{sync_type}"

context = current_context()
if context:
context.tag = sync_type
context.tag = sync_label

# if we have a since token, delete any to-device messages before that token
# (since we now know that the device has received them)
Expand All @@ -384,14 +421,16 @@ async def _wait_for_sync_for_user(
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
result: SyncResult = await self.current_sync_for_user(
sync_config, since_token, full_state=full_state
sync_config, sync_version, since_token, full_state=full_state
)
else:
# Otherwise, we wait for something to happen and report it to the user.
async def current_sync_callback(
before_token: StreamToken, after_token: StreamToken
) -> SyncResult:
return await self.current_sync_for_user(sync_config, since_token)
return await self.current_sync_for_user(
sync_config, sync_version, since_token
)

result = await self.notifier.wait_for_events(
sync_config.user.to_string(),
Expand All @@ -416,13 +455,14 @@ async def current_sync_callback(
lazy_loaded = "true"
else:
lazy_loaded = "false"
non_empty_sync_counter.labels(sync_type, lazy_loaded).inc()
non_empty_sync_counter.labels(sync_label, lazy_loaded).inc()

return result

async def current_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: SyncVersion,
since_token: Optional[StreamToken] = None,
full_state: bool = False,
) -> SyncResult:
Expand All @@ -434,9 +474,21 @@ async def current_sync_for_user(
"""
with start_active_span("sync.current_sync_for_user"):
log_kv({"since_token": since_token})
sync_result = await self.generate_sync_result(
sync_config, since_token, full_state
)

# Go through the `/sync` v2 path
if sync_version == SyncVersion.SYNC_V2:
sync_result = await self.generate_sync_result(
sync_config, since_token, full_state
)
# Go through the MSC3575 Sliding Sync `/sync/e2ee` path
elif sync_version == SyncVersion.E2EE_SYNC:
sync_result = await self.generate_e2ee_sync_result(
sync_config, since_token
)
else:
raise Exception(
f"Unknown sync_version (this is a Synapse problem): {sync_version}"
)

set_tag(SynapseTags.SYNC_RESULT, bool(sync_result))
return sync_result
Expand Down Expand Up @@ -1751,6 +1803,50 @@ async def generate_sync_result(
next_batch=sync_result_builder.now_token,
)

async def generate_e2ee_sync_result(
self,
sync_config: SyncConfig,
since_token: Optional[StreamToken] = None,
) -> SyncResult:
"""Generates the response body of a MSC3575 Sliding Sync `/sync/e2ee` result."""

user_id = sync_config.user.to_string()
# TODO: Should we exclude app services here? There could be an argument to allow
# them since the appservice doesn't have to make a massive initial sync.
# (related to https://github.com/matrix-org/matrix-doc/issues/1144)

# NB: The now_token gets changed by some of the generate_sync_* methods,
# this is due to some of the underlying streams not supporting the ability
# to query up to a given point.
# Always use the `now_token` in `SyncResultBuilder`
now_token = self.event_sources.get_current_token()
log_kv({"now_token": now_token})

joined_room_ids = await self.store.get_rooms_for_user(user_id)

sync_result_builder = SyncResultBuilder(
sync_config,
full_state=False,
since_token=since_token,
now_token=now_token,
joined_room_ids=joined_room_ids,
# Dummy values to fill out `SyncResultBuilder`
excluded_room_ids=frozenset({}),
forced_newly_joined_room_ids=frozenset({}),
membership_change_events=frozenset({}),
)

await self._generate_sync_entry_for_to_device(sync_result_builder)

return E2eeSyncResult(
to_device=sync_result_builder.to_device,
# to_device: List[JsonDict]
# device_lists: DeviceListUpdates
# device_one_time_keys_count: JsonMapping
# device_unused_fallback_key_types: List[str]
next_batch=sync_result_builder.now_token,
)

@measure_func("_generate_sync_entry_for_device_list")
async def _generate_sync_entry_for_device_list(
self,
Expand Down
110 changes: 109 additions & 1 deletion synapse/rest/client/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#
import itertools
import logging
import re
from collections import defaultdict
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union

Expand All @@ -40,6 +41,7 @@
KnockedSyncResult,
SyncConfig,
SyncResult,
SyncVersion,
)
from synapse.http.server import HttpServer
from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string
Expand Down Expand Up @@ -197,7 +199,6 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
user=user,
filter_collection=filter_collection,
is_guest=requester.is_guest,
request_key=request_key,
device_id=device_id,
)

Expand All @@ -220,6 +221,8 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
sync_result = await self.sync_handler.wait_for_sync_for_user(
requester,
sync_config,
SyncVersion.SYNC_V2,
request_key,
since_token=since_token,
timeout=timeout,
full_state=full_state,
Expand Down Expand Up @@ -553,5 +556,110 @@ async def encode_room(
return result


class SlidingSyncE2eeRestServlet(RestServlet):
"""
API endpoint for MSC3575 Sliding Sync `/sync/e2ee`. This is being introduced as part
of Sliding Sync but doesn't have any sliding window component. It's just a way to
get E2EE events without having to sit through a big initial sync (`/sync` v2). And
we can avoid encryption events being backed up by the main sync response.

GET parameters::
timeout(int): How long to wait for new events in milliseconds.
since(batch_token): Batch token when asking for incremental deltas.

Response JSON::
{
"next_batch": // batch token for the next /sync
"to_device": {
// list of to-device events
"events": [
{
"content: { "algorithm": "m.olm.v1.curve25519-aes-sha2", "ciphertext": { ... }, "org.matrix.msgid": "abcd", "session_id": "abcd" },
"type": "m.room.encrypted",
"sender": "@alice:example.com",
}
// ...
]
},
"device_one_time_keys_count": {
"signed_curve25519": 50
},
"device_lists": {
"changed": ["@alice:example.com"],
"left": ["@bob:example.com"]
},
"device_unused_fallback_key_types": [
"signed_curve25519"
]
}
"""

PATTERNS = (re.compile("^/_matrix/client/unstable/org.matrix.msc3575/sync/e2ee$"),)

def __init__(self, hs: "HomeServer"):
super().__init__()
self.auth = hs.get_auth()
self.store = hs.get_datastores().main
self.filtering = hs.get_filtering()
self.sync_handler = hs.get_sync_handler()

async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request, allow_guest=True)
user = requester.user
device_id = requester.device_id

timeout = parse_integer(request, "timeout", default=0)
since = parse_string(request, "since")

sync_config = SyncConfig(
user=user,
# Filtering doesn't apply to this endpoint so just use a default to fill in
# the SyncConfig
filter_collection=self.filtering.DEFAULT_FILTER_COLLECTION,
is_guest=requester.is_guest,
device_id=device_id,
)

since_token = None
if since is not None:
since_token = await StreamToken.from_string(self.store, since)

# Request cache key
request_key = (
SyncVersion.SYNC_V2,
user,
timeout,
since,
)

# Gather data for the response
sync_result = await self.sync_handler.wait_for_sync_for_user(
requester,
sync_config,
SyncVersion.SYNC_V2,
request_key,
since_token=since_token,
timeout=timeout,
full_state=False,
)

# The client may have disconnected by now; don't bother to serialize the
# response if so.
if request._disconnected:
logger.info("Client has disconnected; not serializing response.")
return 200, {}

response: JsonDict = defaultdict(dict)
response["next_batch"] = await sync_result.next_batch.to_string(self.store)

if sync_result.to_device:
response["to_device"] = {"events": sync_result.to_device}

return 200, response


def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
SyncRestServlet(hs).register(http_server)

if hs.config.experimental.msc3575_enabled:
SlidingSyncE2eeRestServlet(hs).register(http_server)
Loading