diff --git a/changelog.d/18934.feature b/changelog.d/18934.feature new file mode 100644 index 00000000000..e24b7a7e34c --- /dev/null +++ b/changelog.d/18934.feature @@ -0,0 +1 @@ +Update [MSC4284: Policy Servers](https://github.com/matrix-org/matrix-spec-proposals/pull/4284) implementation to support signatures when available. \ No newline at end of file diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 8c59772e56c..eac2d776f92 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -316,7 +316,7 @@ async def process_request(self, verify_request: VerifyJsonRequest) -> None: if key_result.valid_until_ts < verify_request.minimum_valid_until_ts: continue - await self._process_json(key_result.verify_key, verify_request) + await self.process_json(key_result.verify_key, verify_request) verified = True if not verified: @@ -326,7 +326,7 @@ async def process_request(self, verify_request: VerifyJsonRequest) -> None: Codes.UNAUTHORIZED, ) - async def _process_json( + async def process_json( self, verify_key: VerifyKey, verify_request: VerifyJsonRequest ) -> None: """Processes the `VerifyJsonRequest`. Raises if the signature can't be diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 542d9650d47..41595043d11 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -495,6 +495,43 @@ async def get_pdu_policy_recommendation( ) return RECOMMENDATION_OK + @trace + @tag_args + async def ask_policy_server_to_sign_event( + self, destination: str, pdu: EventBase, timeout: Optional[int] = None + ) -> Optional[JsonDict]: + """Requests that the destination server (typically a policy server) + sign the event as not spam. + + If the policy server could not be contacted or the policy server + returned an error, this returns no signature. + + Args: + destination: The remote homeserver to ask (a policy server) + pdu: The event to sign + timeout: How long to try (in ms) the destination for before + giving up. None indicates no timeout. + Returns: + The signature from the policy server, structured in the same was as the 'signatures' + JSON in the event e.g { "$policy_server_via_domain" : { "ed25519:policy_server": "signature_base64" }} + """ + logger.debug( + "ask_policy_server_to_sign_event for event_id=%s from %s", + pdu.event_id, + destination, + ) + try: + return await self.transport_layer.ask_policy_server_to_sign_event( + destination, pdu, timeout=timeout + ) + except Exception as e: + logger.warning( + "ask_policy_server_to_sign_event: server %s responded with error: %s", + destination, + e, + ) + return None + @trace @tag_args async def get_pdu( diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 62bf96ce913..5a5dc45f108 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -170,6 +170,32 @@ async def get_policy_recommendation_for_pdu( timeout=timeout, ) + async def ask_policy_server_to_sign_event( + self, destination: str, event: EventBase, timeout: Optional[int] = None + ) -> JsonDict: + """Requests that the destination server (typically a policy server) + sign the event as not spam. + + If the policy server could not be contacted or the policy server + returned an error, this raises that error. + + Args: + destination: The host name of the policy server / homeserver. + event: The event to sign. + timeout: How long to try (in ms) the destination for before giving up. + None indicates no timeout. + Returns: + The signature from the policy server, structured in the same was as the 'signatures' + JSON in the event e.g { "$policy_server_via_domain" : { "ed25519:policy_server": "signature_base64" }} + """ + return await self.client.post_json( + destination=destination, + path="/_matrix/policy/unstable/org.matrix.msc4284/sign", + data=event.get_pdu_json(), + ignore_backoff=True, + timeout=timeout, + ) + async def backfill( self, destination: str, room_id: str, event_tuples: Collection[str], limit: int ) -> Optional[Union[JsonDict, list]]: diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index c8c86d87498..13b286bccc8 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1138,6 +1138,12 @@ async def _create_and_send_nonmember_event_locked( assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % ( event.sender, ) + # if this room uses a policy server, try to get a signature now. + # We use verify=False here as we are about to call is_event_allowed on the same event + # which will do sig checks. + await self._policy_handler.ask_policy_server_to_sign_event( + event, verify=False + ) policy_allowed = await self._policy_handler.is_event_allowed(event) if not policy_allowed: diff --git a/synapse/handlers/room_policy.py b/synapse/handlers/room_policy.py index 170c477d6f4..0663a367144 100644 --- a/synapse/handlers/room_policy.py +++ b/synapse/handlers/room_policy.py @@ -17,6 +17,11 @@ import logging from typing import TYPE_CHECKING +from signedjson.key import decode_verify_key_bytes +from unpaddedbase64 import decode_base64 + +from synapse.api.errors import SynapseError +from synapse.crypto.keyring import VerifyJsonRequest from synapse.events import EventBase from synapse.types.handlers.policy_server import RECOMMENDATION_OK from synapse.util.stringutils import parse_and_validate_server_name @@ -26,6 +31,9 @@ logger = logging.getLogger(__name__) +POLICY_SERVER_EVENT_TYPE = "org.matrix.msc4284.policy" +POLICY_SERVER_KEY_ID = "ed25519:policy_server" + class RoomPolicyHandler: def __init__(self, hs: "HomeServer"): @@ -54,11 +62,11 @@ async def is_event_allowed(self, event: EventBase) -> bool: Returns: bool: True if the event is allowed in the room, False otherwise. """ - if event.type == "org.matrix.msc4284.policy" and event.state_key is not None: + if event.type == POLICY_SERVER_EVENT_TYPE and event.state_key is not None: return True # always allow policy server change events policy_event = await self._storage_controllers.state.get_current_state_event( - event.room_id, "org.matrix.msc4284.policy", "" + event.room_id, POLICY_SERVER_EVENT_TYPE, "" ) if not policy_event: return True # no policy server == default allow @@ -81,6 +89,22 @@ async def is_event_allowed(self, event: EventBase) -> bool: if not is_in_room: return True # policy server not in room == default allow + # Check if the event has been signed with the public key in the policy server state event. + # If it is, we can save an HTTP hit. + # We actually want to get the policy server state event BEFORE THE EVENT rather than + # the current state value, else changing the public key will cause all of these checks to fail. + # However, if we are checking outlier events (which we will due to is_event_allowed being called + # near the edges at _check_sigs_and_hash) we won't know the state before the event, so the + # only safe option is to use the current state + public_key = policy_event.content.get("public_key", None) + if public_key is not None and isinstance(public_key, str): + valid = await self._verify_policy_server_signature( + event, policy_server, public_key + ) + if valid: + return True + # fallthrough to hit /check manually + # At this point, the server appears valid and is in the room, so ask it to check # the event. recommendation = await self._federation_client.get_pdu_policy_recommendation( @@ -90,3 +114,73 @@ async def is_event_allowed(self, event: EventBase) -> bool: return False return True # default allow + + async def _verify_policy_server_signature( + self, event: EventBase, policy_server: str, public_key: str + ) -> bool: + # check the event is signed with this (via, public_key). + verify_json_req = VerifyJsonRequest.from_event(policy_server, event, 0) + try: + key_bytes = decode_base64(public_key) + verify_key = decode_verify_key_bytes(POLICY_SERVER_KEY_ID, key_bytes) + # We would normally use KeyRing.verify_event_for_server but we can't here as we don't + # want to fetch the server key, and instead want to use the public key in the state event. + await self._hs.get_keyring().process_json(verify_key, verify_json_req) + # if the event is correctly signed by the public key in the policy server state event = Allow + return True + except Exception as ex: + logger.warning( + "failed to verify event using public key in policy server event: %s", ex + ) + return False + + async def ask_policy_server_to_sign_event( + self, event: EventBase, verify: bool = False + ) -> None: + """Ask the policy server to sign this event. The signature is added to the event signatures block. + + Does nothing if there is no policy server state event in the room. If the policy server + refuses to sign the event (as it's marked as spam) does nothing. + + Args: + event: The event to sign + verify: If True, verify that the signature is correctly signed by the public_key in the + policy server state event. + Raises: + if verify=True and the policy server signed the event with an invalid signature. Does + not raise if the policy server refuses to sign the event. + """ + policy_event = await self._storage_controllers.state.get_current_state_event( + event.room_id, POLICY_SERVER_EVENT_TYPE, "" + ) + if not policy_event: + return + policy_server = policy_event.content.get("via", None) + if policy_server is None or not isinstance(policy_server, str): + return + # Only ask to sign events if the policy state event has a public_key (so they can be subsequently verified) + public_key = policy_event.content.get("public_key", None) + if public_key is None or not isinstance(public_key, str): + return + + # Ask the policy server to sign this event. + # We set a smallish timeout here as we don't want to block event sending too long. + signature = await self._federation_client.ask_policy_server_to_sign_event( + policy_server, + event, + timeout=3000, + ) + if ( + # the policy server returns {} if it refuses to sign the event. + signature and len(signature) > 0 + ): + event.signatures.update(signature) + if verify: + is_valid = await self._verify_policy_server_signature( + event, policy_server, public_key + ) + if not is_valid: + raise SynapseError( + 500, + f"policy server {policy_server} failed to sign event correctly", + ) diff --git a/tests/handlers/test_room_policy.py b/tests/handlers/test_room_policy.py index d1d0c484fa6..00da1d942fb 100644 --- a/tests/handlers/test_room_policy.py +++ b/tests/handlers/test_room_policy.py @@ -15,11 +15,17 @@ from typing import Optional from unittest import mock +import signedjson +from signedjson.key import encode_verify_key_base64, get_verify_key + from twisted.internet.testing import MemoryReactor +from synapse.api.errors import SynapseError +from synapse.crypto.event_signing import compute_event_signature from synapse.events import EventBase, make_event_from_dict +from synapse.handlers.room_policy import POLICY_SERVER_KEY_ID from synapse.rest import admin -from synapse.rest.client import login, room +from synapse.rest.client import filter, login, room, sync from synapse.server import HomeServer from synapse.types import JsonDict, UserID from synapse.types.handlers.policy_server import RECOMMENDATION_OK, RECOMMENDATION_SPAM @@ -36,16 +42,24 @@ class RoomPolicyTestCase(unittest.FederatingHomeserverTestCase): admin.register_servlets, login.register_servlets, room.register_servlets, + filter.register_servlets, + sync.register_servlets, ] def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: # mock out the federation transport client self.mock_federation_transport_client = mock.Mock( - spec=["get_policy_recommendation_for_pdu"] + spec=[ + "get_policy_recommendation_for_pdu", + "ask_policy_server_to_sign_event", + ] ) self.mock_federation_transport_client.get_policy_recommendation_for_pdu = ( mock.AsyncMock() ) + self.mock_federation_transport_client.ask_policy_server_to_sign_event = ( + mock.AsyncMock() + ) return super().setup_test_homeserver( federation_transport_client=self.mock_federation_transport_client ) @@ -62,6 +76,8 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: room_creator=self.creator, tok=self.creator_token ) room_version = self.get_success(main_store.get_room_version(self.room_id)) + self.room_version = room_version + self.signing_key = signedjson.key.generate_signing_key("policy_server") # Create some sample events self.spammy_event = make_event_from_dict( @@ -110,7 +126,48 @@ async def get_policy_recommendation_for_pdu( self.mock_federation_transport_client.get_policy_recommendation_for_pdu.side_effect = get_policy_recommendation_for_pdu - def _add_policy_server_to_room(self) -> None: + # Mock policy server actions on signing events + async def policy_server_signs_event( + destination: str, pdu: EventBase, timeout: Optional[int] = None + ) -> Optional[JsonDict]: + sigs = compute_event_signature( + pdu.room_version, + pdu.get_dict(), + self.OTHER_SERVER_NAME, + self.signing_key, + ) + return sigs + + async def policy_server_signs_event_with_wrong_key( + destination: str, pdu: EventBase, timeout: Optional[int] = None + ) -> Optional[JsonDict]: + sk = signedjson.key.generate_signing_key("policy_server") + sigs = compute_event_signature( + pdu.room_version, + pdu.get_dict(), + self.OTHER_SERVER_NAME, + sk, + ) + return sigs + + async def policy_server_refuses_to_sign_event( + destination: str, pdu: EventBase, timeout: Optional[int] = None + ) -> Optional[JsonDict]: + return {} + + async def policy_server_event_sign_error( + destination: str, pdu: EventBase, timeout: Optional[int] = None + ) -> Optional[JsonDict]: + return None + + self.policy_server_signs_event = policy_server_signs_event + self.policy_server_refuses_to_sign_event = policy_server_refuses_to_sign_event + self.policy_server_event_sign_error = policy_server_event_sign_error + self.policy_server_signs_event_with_wrong_key = ( + policy_server_signs_event_with_wrong_key + ) + + def _add_policy_server_to_room(self, public_key: Optional[str] = None) -> None: # Inject a member event into the room policy_user_id = f"@policy:{self.OTHER_SERVER_NAME}" self.get_success( @@ -118,12 +175,15 @@ def _add_policy_server_to_room(self) -> None: self.hs, self.room_id, policy_user_id, "join" ) ) + content = { + "via": self.OTHER_SERVER_NAME, + } + if public_key is not None: + content["public_key"] = public_key self.helper.send_state( self.room_id, "org.matrix.msc4284.policy", - { - "via": self.OTHER_SERVER_NAME, - }, + content, tok=self.creator_token, state_key="", ) @@ -218,9 +278,192 @@ def test_spammy_event_is_spam(self) -> None: self.assertEqual(ok, False) self.assertEqual(self.call_count, 1) - def test_not_spammy_event_is_not_spam(self) -> None: - self._add_policy_server_to_room() + def test_signed_event_is_not_spam(self) -> None: + verify_key_str = encode_verify_key_base64(get_verify_key(self.signing_key)) + self._add_policy_server_to_room(public_key=verify_key_str) + event = make_event_from_dict( + room_version=self.room_version, + internal_metadata_dict={}, + event_dict={ + "room_id": self.room_id, + "type": "m.room.message", + "sender": "@spammy:example.org", + "content": { + "msgtype": "m.text", + "body": "This is a signed event.", + }, + }, + ) + + # We're going to sign the event and check it marks the event as not-spam, without hitting the + # policy server + sigs = compute_event_signature( + event.room_version, + event.get_dict(), + self.OTHER_SERVER_NAME, + self.signing_key, + ) + event.signatures.update(sigs) - ok = self.get_success(self.handler.is_event_allowed(self.not_spammy_event)) + ok = self.get_success(self.handler.is_event_allowed(event)) self.assertEqual(ok, True) - self.assertEqual(self.call_count, 1) + # Make sure we did not make an HTTP hit to get_policy_recommendation_for_pdu + self.assertEqual(self.call_count, 0) + + def test_ask_policy_server_to_sign_event_ok(self) -> None: + verify_key_str = encode_verify_key_base64(get_verify_key(self.signing_key)) + self._add_policy_server_to_room(public_key=verify_key_str) + event = make_event_from_dict( + room_version=self.room_version, + internal_metadata_dict={}, + event_dict={ + "room_id": self.room_id, + "type": "m.room.message", + "sender": "@spammy:example.org", + "content": { + "msgtype": "m.text", + "body": "This is another signed event.", + }, + }, + ) + self.mock_federation_transport_client.ask_policy_server_to_sign_event.side_effect = self.policy_server_signs_event + self.get_success( + self.handler.ask_policy_server_to_sign_event(event, verify=True) + ) + self.assertEqual(len(event.signatures), 1) + + def test_ask_policy_server_to_sign_event_refuses(self) -> None: + verify_key_str = encode_verify_key_base64(get_verify_key(self.signing_key)) + self._add_policy_server_to_room(public_key=verify_key_str) + event = make_event_from_dict( + room_version=self.room_version, + internal_metadata_dict={}, + event_dict={ + "room_id": self.room_id, + "type": "m.room.message", + "sender": "@spammy:example.org", + "content": { + "msgtype": "m.text", + "body": "This is spam and is refused.", + }, + }, + ) + self.mock_federation_transport_client.ask_policy_server_to_sign_event.side_effect = self.policy_server_refuses_to_sign_event + self.get_success( + self.handler.ask_policy_server_to_sign_event(event, verify=True) + ) + self.assertEqual(len(event.signatures), 0) + + def test_ask_policy_server_to_sign_event_cannot_reach(self) -> None: + verify_key_str = encode_verify_key_base64(get_verify_key(self.signing_key)) + self._add_policy_server_to_room(public_key=verify_key_str) + event = make_event_from_dict( + room_version=self.room_version, + internal_metadata_dict={}, + event_dict={ + "room_id": self.room_id, + "type": "m.room.message", + "sender": "@spammy:example.org", + "content": { + "msgtype": "m.text", + "body": "This is spam and is refused.", + }, + }, + ) + self.mock_federation_transport_client.ask_policy_server_to_sign_event.side_effect = self.policy_server_event_sign_error + self.get_success( + self.handler.ask_policy_server_to_sign_event(event, verify=True) + ) + self.assertEqual(len(event.signatures), 0) + + def test_ask_policy_server_to_sign_event_wrong_sig(self) -> None: + verify_key_str = encode_verify_key_base64(get_verify_key(self.signing_key)) + self._add_policy_server_to_room(public_key=verify_key_str) + self.mock_federation_transport_client.ask_policy_server_to_sign_event.side_effect = self.policy_server_signs_event_with_wrong_key + unverified_event = make_event_from_dict( + room_version=self.room_version, + internal_metadata_dict={}, + event_dict={ + "room_id": self.room_id, + "type": "m.room.message", + "sender": "@spammy:example.org", + "content": { + "msgtype": "m.text", + "body": "This is signed but with the wrong key.", + }, + }, + ) + # verify=False so it passes + self.get_success( + self.handler.ask_policy_server_to_sign_event(unverified_event, verify=False) + ) + self.assertEqual(len(unverified_event.signatures), 1) + + verified_event = make_event_from_dict( + room_version=self.room_version, + internal_metadata_dict={}, + event_dict={ + "room_id": self.room_id, + "type": "m.room.message", + "sender": "@spammy:example.org", + "content": { + "msgtype": "m.text", + "body": "This is signed but with the wrong key.", + }, + }, + ) + # verify=True so it fails + self.get_failure( + self.handler.ask_policy_server_to_sign_event(verified_event, verify=True), + SynapseError, + ) + + def test_policy_server_signatures_end_to_end(self) -> None: + verify_key_str = encode_verify_key_base64(get_verify_key(self.signing_key)) + self._add_policy_server_to_room(public_key=verify_key_str) + self.mock_federation_transport_client.ask_policy_server_to_sign_event.side_effect = self.policy_server_signs_event + # Send an event and ensure we get a policy server signature on it. + resp = self.helper.send_event( + self.room_id, + "m.room.message", + {"body": "honk", "msgtype": "m.text"}, + tok=self.creator_token, + ) + ev = self._fetch_federation_event(resp["event_id"]) + assert ev is not None + sig = ( + ev.get("signatures", {}) + .get(self.OTHER_SERVER_NAME, {}) + .get(POLICY_SERVER_KEY_ID, None) + ) + self.assertNotEquals( + sig, + None, + f"event did not include policy server signature, signature block = {ev.get('signatures', None)}", + ) + + def _fetch_federation_event(self, event_id: str) -> Optional[JsonDict]: + # Request federation events to see the signatures + channel = self.make_request( + "POST", + "/_matrix/client/v3/user/%s/filter" % (self.creator), + {"event_format": "federation"}, + self.creator_token, + ) + self.assertEqual(channel.code, 200) + filter_id = channel.json_body["filter_id"] + # Note: we could use `/context`, but given we don't test that neutral events are + # delivered over `/sync` anywhere else, might as well implicitly test it here. + channel = self.make_request( + "GET", + "/sync?filter=%s" % filter_id, + access_token=self.creator_token, + ) + self.assertEqual(channel.code, 200, channel.result) + + for ev in channel.json_body["rooms"]["join"][self.room_id]["timeline"][ + "events" + ]: + if ev["event_id"] == event_id: + return ev + return None