Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17127.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug which meant that to-device messages received over federation could be dropped when the server was under load or networking problems caused problems between Synapse processes or the database.
44 changes: 26 additions & 18 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,25 @@ async def _process_edu(edu_dict: JsonDict) -> None:
edu_type=edu_dict["edu_type"],
content=edu_dict["content"],
)
await self.registry.on_edu(edu.edu_type, origin, edu.content)
try:
await self.registry.on_edu(edu.edu_type, origin, edu.content)
except Exception:
# If there was an error handling the EDU, we must reject the
# transaction.
#
# Some EDU types (notably, to-device messages) are, despite their name,
# expected to be reliable; if we weren't able to do something with it,
# we have to tell the sender that, and the only way the protocol gives
# us to do so is by sending an HTTP error back on the transaction.
#
# We log the exception now, and then raise a new SynapseError to cause
# the transaction to be failed.
logger.exception("Error handling EDU of type %s", edu.edu_type)
raise SynapseError(500, f"Error handing EDU of type {edu.edu_type}")

# TODO: if the first EDU fails, we should probably abort the whole
# thing rather than carrying on with the rest of them. That would
# probably be best done inside `concurrently_execute`.

await concurrently_execute(
_process_edu,
Expand Down Expand Up @@ -1414,12 +1432,7 @@ async def on_edu(self, edu_type: str, origin: str, content: dict) -> None:
handler = self.edu_handlers.get(edu_type)
if handler:
with start_active_span_from_edu(content, "handle_edu"):
try:
await handler(origin, content)
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception:
logger.exception("Failed to handle edu %r", edu_type)
await handler(origin, content)
return

# Check if we can route it somewhere else that isn't us
Expand All @@ -1428,17 +1441,12 @@ async def on_edu(self, edu_type: str, origin: str, content: dict) -> None:
# Pick an instance randomly so that we don't overload one.
route_to = random.choice(instances)

try:
await self._send_edu(
instance_name=route_to,
edu_type=edu_type,
origin=origin,
content=content,
)
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception:
logger.exception("Failed to handle edu %r", edu_type)
await self._send_edu(
instance_name=route_to,
edu_type=edu_type,
origin=origin,
content=content,
)
return

# Oh well, let's just log and move on.
Expand Down
3 changes: 3 additions & 0 deletions synapse/handlers/devicemessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ async def on_direct_to_device_edu(self, origin: str, content: JsonDict) -> None:
"""
Handle receiving to-device messages from remote homeservers.

Note that any errors thrown from this method will cause the federation /send
request to receive an error response.

Args:
origin: The remote homeserver.
content: The JSON dictionary containing the to-device messages.
Expand Down
17 changes: 17 additions & 0 deletions tests/federation/test_federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,23 @@ def test_bad_request(self, query_content: bytes) -> None:
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, channel.result)
self.assertEqual(channel.json_body["errcode"], "M_NOT_JSON")

def test_failed_edu_causes_500(self) -> None:
"""If the EDU handler fails, /send should return a 500."""

async def failing_handler(_origin: str, _content: JsonDict) -> None:
raise Exception("bleh")

self.hs.get_federation_registry().register_edu_handler(
"FAIL_EDU_TYPE", failing_handler
)

channel = self.make_signed_federation_request(
"PUT",
"/_matrix/federation/v1/send/txn",
{"edus": [{"edu_type": "FAIL_EDU_TYPE", "content": {}}]},
)
self.assertEqual(500, channel.code, channel.result)


class ServerACLsTestCase(unittest.TestCase):
def test_blocked_server(self) -> None:
Expand Down