Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11660.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve performance when fetching bundled aggregations for multiple events.
4 changes: 1 addition & 3 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1783,9 +1783,7 @@ def _handle_event_relations(
)

if rel_type == RelationTypes.REPLACE:
txn.call_after(
self.store.get_applicable_edit.invalidate, (parent_id, event.room_id)
)
txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,))

if rel_type == RelationTypes.THREAD:
txn.call_after(
Expand Down
115 changes: 80 additions & 35 deletions synapse/storage/databases/main/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from typing import (
TYPE_CHECKING,
Any,
Collection,
Dict,
Iterable,
List,
Expand All @@ -28,7 +29,7 @@
import attr
from frozendict import frozendict

from synapse.api.constants import EventTypes, RelationTypes
from synapse.api.constants import RelationTypes
from synapse.events import EventBase
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
Expand All @@ -44,6 +45,7 @@
RelationPaginationToken,
)
from synapse.util.caches.descriptors import cached
from synapse.util.caches.lrucache import LruCache

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand All @@ -63,6 +65,11 @@ def __init__(
self._msc1849_enabled = hs.config.experimental.msc1849_enabled
self._msc3440_enabled = hs.config.experimental.msc3440_enabled

self.get_applicable_edit: LruCache[str, Optional[EventBase]] = LruCache(
cache_name="get_applicable_edit",
max_size=hs.config.caches.event_cache_size, # TODO
)

@cached(tree=True)
async def get_relations_for_event(
self,
Expand Down Expand Up @@ -325,60 +332,96 @@ def _get_aggregation_groups_for_event_txn(
"get_aggregation_groups_for_event", _get_aggregation_groups_for_event_txn
)

@cached()
async def get_applicable_edit(
self, event_id: str, room_id: str
) -> Optional[EventBase]:
async def _get_applicable_edits(
self, event_ids: Iterable[str]
) -> Dict[str, EventBase]:
"""Get the most recent edit (if any) that has happened for the given
event.
events.

Correctly handles checking whether edits were allowed to happen.

Args:
event_id: The original event ID
room_id: The original event's room ID
event_ids: The original event IDs

Returns:
The most recent edit, if any.
A map of the most recent edit for each event. A missing event implies
there is no edits.
"""

# A map of the original event IDs to the edit events.
edits_by_original = {}

# Check if an edit for this event is currently cached.
event_ids_to_check = []
for event_id in event_ids:
if event_id not in self.get_applicable_edit:
event_ids_to_check.append(event_id)
else:
edit_event = self.get_applicable_edit[event_id]
if edit_event:
edits_by_original[event_id] = edit_event

# If all events were cached, all done.
if not event_ids_to_check:
return edits_by_original

# We only allow edits for `m.room.message` events that have the same sender
# and event type. We can't assert these things during regular event auth so
# we have to do the checks post hoc.

# Fetches latest edit that has the same type and sender as the
# original, and is an `m.room.message`.
#
# TODO Should this ensure it does not return results for state events / redacted events?
sql = """
SELECT edit.event_id FROM events AS edit
SELECT original.event_id, edit.event_id FROM events AS edit
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we would be only pull out the latest edit event per original event, but I haven't found a reasonable way to do that (maybe a lateral join, but that's not supported on sqlite). Any thoughts on how to improve this would be appreciated!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does latest edit mean "edit with largest depth"?

Might we be able to use a trick like https://stackoverflow.com/a/27802817/5252017 ? (Sorry, not an expert here)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It means the largest origin_server_ts (which is why we're ordering by that descending.)

Copy link
Contributor

@reivilibre reivilibre Jan 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe a lateral join, but that's not supported on sqlite

N.B. SQLite does support lateral joins as long as you don't write the word LATERAL — if that's the only thing blocking you, you should be able to work around that by only inserting that word for Postgres.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reivilibre Do you have any reference for that? I've been unable to get it to work (and my searching online has yielded "you can't do this on SQLite").

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On Postgres you can do SELECT DISTINCT ON (original.event_id) ... which will choose the first row (as defined by the ORDER BY clause). I don't think SQLite has that, and so you would need to use a different query, at which point you may as well fall back to the old behaviour.

Potentially you could use window functions and first_value , which I think both postgres and sqlite support, but those sorts of queries really are voodoo magic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️ @erikjohnston Thank you! That gave me some breadcrumbs to realize we do something very similar elsewhere:

if isinstance(self.database_engine, PostgresEngine):
# The `DISTINCT ON` clause will pick the *first* row it
# encounters, so ordering by stream ID desc will ensure we get
# the latest key.
sql = """
SELECT DISTINCT ON (user_id, keytype) user_id, keytype, keydata, stream_id
FROM e2e_cross_signing_keys
WHERE %(clause)s
ORDER BY user_id, keytype, stream_id DESC
""" % {
"clause": clause
}
else:
# SQLite has special handling for bare columns when using
# MIN/MAX with a `GROUP BY` clause where it picks the value from
# a row that matches the MIN/MAX.
sql = """
SELECT user_id, keytype, keydata, MAX(stream_id)
FROM e2e_cross_signing_keys
WHERE %(clause)s
GROUP BY user_id, keytype
""" % {
"clause": clause
}

I think we can abstract out the changes in 8400c20, but that is proving a bit tedious / invasive so I'd like to do it separately.

INNER JOIN event_relations USING (event_id)
INNER JOIN events AS original ON
original.event_id = relates_to_id
AND edit.type = original.type
AND edit.sender = original.sender
AND edit.room_id = original.room_id
WHERE
relates_to_id = ?
%s
AND relation_type = ?
AND edit.room_id = ?
AND edit.type = 'm.room.message'
ORDER by edit.origin_server_ts DESC, edit.event_id DESC
LIMIT 1
"""

def _get_applicable_edit_txn(txn: LoggingTransaction) -> Optional[str]:
txn.execute(sql, (event_id, RelationTypes.REPLACE, room_id))
row = txn.fetchone()
if row:
return row[0]
return None

edit_id = await self.db_pool.runInteraction(
def _get_applicable_edit_txn(txn: LoggingTransaction) -> Dict[str, str]:
clause, args = make_in_list_sql_clause(
txn.database_engine, "relates_to_id", event_ids_to_check
)
args.append(RelationTypes.REPLACE)

txn.execute(sql % (clause,), args)
rows = txn.fetchall()
result = {}
for original_event_id, edit_event_id in rows:
# Only consider the latest edit (by origin server ts).
if original_event_id not in result:
result[original_event_id] = edit_event_id
return result

edit_ids = await self.db_pool.runInteraction(
"get_applicable_edit", _get_applicable_edit_txn
)

if not edit_id:
return None
edits = await self.get_events(edit_ids.values()) # type: ignore[attr-defined]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this class depend on EventsWorkerStore instead of ignoring the error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can attempt to do that! Note that we have similar ignores all over, see #11165.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gives other errors (similar to #11165) about inconsistent MROs. I'm going to leave this to be solved in #11165.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair!


# Add the newly checked events to the cache. If an edit exists, add it to
# the results.
for original_event_id in event_ids_to_check:
# There might not be an edit or the event might not be known. In
# either case, cache the None.
edit_event_id = edit_ids.get(original_event_id)
edit_event = edits.get(edit_event_id)

self.get_applicable_edit.set(original_event_id, edit_event)
if edit_event:
edits_by_original[original_event_id] = edit_event

return await self.get_event(edit_id, allow_none=True) # type: ignore[attr-defined]
return edits_by_original

@cached()
async def get_thread_summary(
Expand Down Expand Up @@ -589,13 +632,6 @@ async def _get_bundled_aggregation_for_event(
if references.chunk:
aggregations[RelationTypes.REFERENCE] = references.to_dict()

edit = None
if event.type == EventTypes.Message:
edit = await self.get_applicable_edit(event_id, room_id)

if edit:
aggregations[RelationTypes.REPLACE] = edit

# If this event is the start of a thread, include a summary of the replies.
if self._msc3440_enabled:
(
Expand All @@ -613,7 +649,7 @@ async def _get_bundled_aggregation_for_event(
return aggregations

async def get_bundled_aggregations(
self, events: Iterable[EventBase]
self, events: Collection[EventBase]
) -> Dict[str, Dict[str, Any]]:
"""Generate bundled aggregations for events.

Expand All @@ -628,12 +664,21 @@ async def get_bundled_aggregations(
if not self._msc1849_enabled:
return {}

# TODO Parallelize.
results = {}
# event ID -> bundled aggregation in non-serialized form.
results: Dict[str, Dict[str, Any]] = {}

event_ids = [event.event_id for event in events]

# Fetch any edits.
edits = await self._get_applicable_edits(event_ids)
for event_id, edit in edits.items():
results.setdefault(event_id, {})[RelationTypes.REPLACE] = edit

# Fetch other relations per event.
for event in events:
event_result = await self._get_bundled_aggregation_for_event(event)
if event_result is not None:
results[event.event_id] = event_result
results.setdefault(event.event_id, {}).update(event_result)

return results

Expand Down