-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Rewrite userdir to be faster #4537
Changes from all commits
07b82a2
f993fd6
e8dd750
cf079f3
e818649
35b33d1
c123c71
1b271f2
ee98058
994243e
b845be4
487bdc0
e7e94d7
be4f84b
84a0240
060c5fb
766b86d
1b3bc5b
cf03ec7
c3b168f
bd66799
2b3f166
c6a68b4
f21dcc7
b11de34
829fb4a
2cd5abc
e224f9c
ebe8bb5
aa13be6
3c4d418
ff78918
4716266
d3e216a
75174bb
f385f75
a88c1d6
28239aa
a689842
06a93b0
fab3b33
2574889
a50de76
960b3f0
d64fbc6
7c5b66e
59334f5
42d0d3e
8110905
d0d8a6e
be857fd
8e64d86
e652138
43b71b3
6c46504
95ed0fa
385a075
1e7d2a4
eed6db8
2bf1d6a
72e74f4
1335416
c628aaa
843d287
f35b3cf
8e963dd
2847e65
ea0cc09
765c458
a8c48b5
3408e8d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| The user directory has been rewritten to make it faster, with less chance of falling behind on a large server. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,7 +15,7 @@ | |
|
|
||
| import logging | ||
|
|
||
| from six import iteritems | ||
| from six import iteritems, iterkeys | ||
|
|
||
| from twisted.internet import defer | ||
|
|
||
|
|
@@ -63,10 +63,6 @@ def __init__(self, hs): | |
| # When start up for the first time we need to populate the user_directory. | ||
| # This is a set of user_id's we've inserted already | ||
| self.initially_handled_users = set() | ||
| self.initially_handled_users_in_public = set() | ||
|
|
||
| self.initially_handled_users_share = set() | ||
| self.initially_handled_users_share_private_room = set() | ||
|
|
||
| # The current position in the current_state_delta stream | ||
| self.pos = None | ||
|
|
@@ -140,7 +136,6 @@ def handle_user_deactivated(self, user_id): | |
| # FIXME(#3714): We should probably do this in the same worker as all | ||
| # the other changes. | ||
| yield self.store.remove_from_user_dir(user_id) | ||
| yield self.store.remove_from_user_in_public_room(user_id) | ||
|
|
||
| @defer.inlineCallbacks | ||
| def _unsafe_process(self): | ||
|
|
@@ -215,15 +210,13 @@ def _do_initial_spam(self): | |
| logger.info("Processed all users") | ||
|
|
||
| self.initially_handled_users = None | ||
| self.initially_handled_users_in_public = None | ||
| self.initially_handled_users_share = None | ||
| self.initially_handled_users_share_private_room = None | ||
|
|
||
| yield self.store.update_user_directory_stream_pos(new_pos) | ||
|
|
||
| @defer.inlineCallbacks | ||
| def _handle_initial_room(self, room_id): | ||
| """Called when we initially fill out user_directory one room at a time | ||
| """ | ||
| Called when we initially fill out user_directory one room at a time | ||
| """ | ||
| is_in_room = yield self.store.is_host_joined(room_id, self.server_name) | ||
| if not is_in_room: | ||
|
|
@@ -238,23 +231,15 @@ def _handle_initial_room(self, room_id): | |
| unhandled_users = user_ids - self.initially_handled_users | ||
|
|
||
| yield self.store.add_profiles_to_user_dir( | ||
| room_id, | ||
| {user_id: users_with_profile[user_id] for user_id in unhandled_users}, | ||
| ) | ||
|
|
||
| self.initially_handled_users |= unhandled_users | ||
|
|
||
| if is_public: | ||
| yield self.store.add_users_to_public_room( | ||
| room_id, user_ids=user_ids - self.initially_handled_users_in_public | ||
| ) | ||
| self.initially_handled_users_in_public |= user_ids | ||
|
|
||
| # We now go and figure out the new users who share rooms with user entries | ||
| # We sleep aggressively here as otherwise it can starve resources. | ||
| # We also batch up inserts/updates, but try to avoid too many at once. | ||
| to_insert = set() | ||
| to_update = set() | ||
| count = 0 | ||
| for user_id in user_ids: | ||
| if count % self.INITIAL_ROOM_SLEEP_COUNT == 0: | ||
|
|
@@ -277,44 +262,18 @@ def _handle_initial_room(self, room_id): | |
| count += 1 | ||
|
|
||
| user_set = (user_id, other_user_id) | ||
richvdh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| if user_set in self.initially_handled_users_share_private_room: | ||
| continue | ||
|
|
||
| if user_set in self.initially_handled_users_share: | ||
| if is_public: | ||
| continue | ||
| to_update.add(user_set) | ||
| else: | ||
| to_insert.add(user_set) | ||
|
|
||
| if is_public: | ||
| self.initially_handled_users_share.add(user_set) | ||
| else: | ||
| self.initially_handled_users_share_private_room.add(user_set) | ||
| to_insert.add(user_set) | ||
|
|
||
| if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE: | ||
| yield self.store.add_users_who_share_room( | ||
| room_id, not is_public, to_insert | ||
| ) | ||
| to_insert.clear() | ||
|
|
||
| if len(to_update) > self.INITIAL_ROOM_BATCH_SIZE: | ||
| yield self.store.update_users_who_share_room( | ||
| room_id, not is_public, to_update | ||
| ) | ||
| to_update.clear() | ||
|
|
||
| if to_insert: | ||
| yield self.store.add_users_who_share_room(room_id, not is_public, to_insert) | ||
| to_insert.clear() | ||
|
|
||
| if to_update: | ||
| yield self.store.update_users_who_share_room( | ||
| room_id, not is_public, to_update | ||
| ) | ||
| to_update.clear() | ||
|
|
||
| @defer.inlineCallbacks | ||
| def _handle_deltas(self, deltas): | ||
| """Called with the state deltas to process | ||
|
|
@@ -356,6 +315,7 @@ def _handle_deltas(self, deltas): | |
| user_ids = yield self.store.get_users_in_dir_due_to_room( | ||
| room_id | ||
| ) | ||
|
|
||
| for user_id in user_ids: | ||
| yield self._handle_remove_user(room_id, user_id) | ||
| return | ||
|
|
@@ -436,14 +396,20 @@ def _handle_room_publicity_change(self, room_id, prev_event_id, event_id, typ): | |
| # ignore the change | ||
| return | ||
|
|
||
| if change: | ||
| users_with_profile = yield self.state.get_current_user_in_room(room_id) | ||
| for user_id, profile in iteritems(users_with_profile): | ||
| yield self._handle_new_user(room_id, user_id, profile) | ||
| else: | ||
| users = yield self.store.get_users_in_public_due_to_room(room_id) | ||
| for user_id in users: | ||
| yield self._handle_remove_user(room_id, user_id) | ||
| users_with_profile = yield self.state.get_current_user_in_room(room_id) | ||
|
|
||
| # Remove every user from the sharing tables for that room. | ||
| for user_id in iterkeys(users_with_profile): | ||
| yield self.store.remove_user_who_share_room(user_id, room_id) | ||
|
|
||
| # Then, re-add them to the tables. | ||
| # NOTE: this is not the most efficient method, as handle_new_user sets | ||
| # up local_user -> other_user and other_user_whos_local -> local_user, | ||
| # which when ran over an entire room, will result in the same values | ||
| # being added multiple times. The batching upserts shouldn't make this | ||
| # too bad, though. | ||
| for user_id, profile in iteritems(users_with_profile): | ||
| yield self._handle_new_user(room_id, user_id, profile) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it feels like we're (still) repeating ourselves here. Suppose there are 3 local users in the room: A, B, C. First we call _handle_new_user with A. That will add entries: (A, A), (A, B), (A, C), (B, A), (C, A). So most of those entries are being added twice. (happy if you want to say that's an existing problem, to be ignored for now, but since we're rewriting this it seems a reasonable time to consider it, or at least add a comment)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Without the adding yourself, that drops one of those, but yes, it does repeat that, but that's an existing problem.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Ironically the "adding yourself" was the only entry which wasn't being redone.
Fair enough. I'd still be happier if you could add a TODO or something so that I know I'm not going mad next time I look at this code. |
||
|
|
||
| @defer.inlineCallbacks | ||
| def _handle_local_user(self, user_id): | ||
|
|
@@ -457,7 +423,7 @@ def _handle_local_user(self, user_id): | |
|
|
||
| row = yield self.store.get_user_in_directory(user_id) | ||
| if not row: | ||
| yield self.store.add_profiles_to_user_dir(None, {user_id: profile}) | ||
| yield self.store.add_profiles_to_user_dir({user_id: profile}) | ||
|
|
||
| @defer.inlineCallbacks | ||
| def _handle_new_user(self, room_id, user_id, profile): | ||
|
|
@@ -471,55 +437,27 @@ def _handle_new_user(self, room_id, user_id, profile): | |
|
|
||
| row = yield self.store.get_user_in_directory(user_id) | ||
| if not row: | ||
| yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile}) | ||
| yield self.store.add_profiles_to_user_dir({user_id: profile}) | ||
|
|
||
| is_public = yield self.store.is_room_world_readable_or_publicly_joinable( | ||
| room_id | ||
| ) | ||
|
|
||
| if is_public: | ||
| row = yield self.store.get_user_in_public_room(user_id) | ||
| if not row: | ||
| yield self.store.add_users_to_public_room(room_id, [user_id]) | ||
| else: | ||
| logger.debug("Not adding new user to public dir, %r", user_id) | ||
|
|
||
| # Now we update users who share rooms with users. We do this by getting | ||
| # all the current users in the room and seeing which aren't already | ||
| # marked in the database as sharing with `user_id` | ||
|
|
||
| # Now we update users who share rooms with users. | ||
| users_with_profile = yield self.state.get_current_user_in_room(room_id) | ||
|
|
||
| to_insert = set() | ||
| to_update = set() | ||
|
|
||
| is_appservice = self.store.get_if_app_services_interested_in_user(user_id) | ||
|
|
||
| # First, if they're our user then we need to update for every user | ||
| if self.is_mine_id(user_id) and not is_appservice: | ||
| # Returns a map of other_user_id -> shared_private. We only need | ||
| # to update mappings if for users that either don't share a room | ||
| # already (aren't in the map) or, if the room is private, those that | ||
| # only share a public room. | ||
| user_ids_shared = yield self.store.get_users_who_share_room_from_dir( | ||
| user_id | ||
| ) | ||
| if self.is_mine_id(user_id): | ||
|
|
||
| for other_user_id in users_with_profile: | ||
| if user_id == other_user_id: | ||
| continue | ||
| is_appservice = self.store.get_if_app_services_interested_in_user(user_id) | ||
|
|
||
| # We don't care about appservice users. | ||
| if not is_appservice: | ||
| for other_user_id in users_with_profile: | ||
| if user_id == other_user_id: | ||
| continue | ||
|
|
||
| shared_is_private = user_ids_shared.get(other_user_id) | ||
| if shared_is_private is True: | ||
| # We've already marked in the database they share a private room | ||
| continue | ||
| elif shared_is_private is False: | ||
| # They already share a public room, so only update if this is | ||
| # a private room | ||
| if not is_public: | ||
| to_update.add((user_id, other_user_id)) | ||
| elif shared_is_private is None: | ||
| # This is the first time they both share a room | ||
| to_insert.add((user_id, other_user_id)) | ||
|
|
||
| # Next we need to update for every local user in the room | ||
|
|
@@ -531,29 +469,11 @@ def _handle_new_user(self, room_id, user_id, profile): | |
| other_user_id | ||
| ) | ||
| if self.is_mine_id(other_user_id) and not is_appservice: | ||
| shared_is_private = yield self.store.get_if_users_share_a_room( | ||
| other_user_id, user_id | ||
| ) | ||
| if shared_is_private is True: | ||
| # We've already marked in the database they share a private room | ||
| continue | ||
| elif shared_is_private is False: | ||
| # They already share a public room, so only update if this is | ||
| # a private room | ||
| if not is_public: | ||
| to_update.add((other_user_id, user_id)) | ||
| elif shared_is_private is None: | ||
| # This is the first time they both share a room | ||
| to_insert.add((other_user_id, user_id)) | ||
| to_insert.add((other_user_id, user_id)) | ||
|
|
||
| if to_insert: | ||
| yield self.store.add_users_who_share_room(room_id, not is_public, to_insert) | ||
|
|
||
| if to_update: | ||
| yield self.store.update_users_who_share_room( | ||
| room_id, not is_public, to_update | ||
| ) | ||
|
|
||
| @defer.inlineCallbacks | ||
| def _handle_remove_user(self, room_id, user_id): | ||
| """Called when we might need to remove user to directory | ||
|
|
@@ -562,84 +482,16 @@ def _handle_remove_user(self, room_id, user_id): | |
| room_id (str): room_id that user left or stopped being public that | ||
| user_id (str) | ||
| """ | ||
| logger.debug("Maybe removing user %r", user_id) | ||
|
|
||
| row = yield self.store.get_user_in_directory(user_id) | ||
| update_user_dir = row and row["room_id"] == room_id | ||
|
|
||
| row = yield self.store.get_user_in_public_room(user_id) | ||
| update_user_in_public = row and row["room_id"] == room_id | ||
|
|
||
| if update_user_in_public or update_user_dir: | ||
| # XXX: Make this faster? | ||
| rooms = yield self.store.get_rooms_for_user(user_id) | ||
| for j_room_id in rooms: | ||
| if not update_user_in_public and not update_user_dir: | ||
| break | ||
|
|
||
| is_in_room = yield self.store.is_host_joined( | ||
| j_room_id, self.server_name | ||
| ) | ||
|
|
||
| if not is_in_room: | ||
| continue | ||
|
|
||
| if update_user_dir: | ||
| update_user_dir = False | ||
| yield self.store.update_user_in_user_dir(user_id, j_room_id) | ||
| logger.debug("Removing user %r", user_id) | ||
|
|
||
| is_public = yield self.store.is_room_world_readable_or_publicly_joinable( | ||
| j_room_id | ||
| ) | ||
| # Remove user from sharing tables | ||
| yield self.store.remove_user_who_share_room(user_id, room_id) | ||
|
|
||
| if update_user_in_public and is_public: | ||
| yield self.store.update_user_in_public_user_list(user_id, j_room_id) | ||
| update_user_in_public = False | ||
| # Are they still in a room with members? If not, remove them entirely. | ||
hawkowl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| users_in_room_with = yield self.store.get_users_who_share_room_from_dir(user_id) | ||
|
|
||
| if update_user_dir: | ||
| if len(users_in_room_with) == 0: | ||
| yield self.store.remove_from_user_dir(user_id) | ||
| elif update_user_in_public: | ||
| yield self.store.remove_from_user_in_public_room(user_id) | ||
|
|
||
| # Now handle users_who_share_rooms. | ||
|
|
||
| # Get a list of user tuples that were in the DB due to this room and | ||
| # users (this includes tuples where the other user matches `user_id`) | ||
| user_tuples = yield self.store.get_users_in_share_dir_with_room_id( | ||
| user_id, room_id | ||
| ) | ||
|
|
||
| for user_id, other_user_id in user_tuples: | ||
| # For each user tuple get a list of rooms that they still share, | ||
| # trying to find a private room, and update the entry in the DB | ||
| rooms = yield self.store.get_rooms_in_common_for_users( | ||
| user_id, other_user_id | ||
| ) | ||
|
|
||
| # If they dont share a room anymore, remove the mapping | ||
| if not rooms: | ||
| yield self.store.remove_user_who_share_room(user_id, other_user_id) | ||
| continue | ||
|
|
||
| found_public_share = None | ||
| for j_room_id in rooms: | ||
| is_public = yield self.store.is_room_world_readable_or_publicly_joinable( | ||
| j_room_id | ||
| ) | ||
|
|
||
| if is_public: | ||
| found_public_share = j_room_id | ||
| else: | ||
| found_public_share = None | ||
| yield self.store.update_users_who_share_room( | ||
| room_id, not is_public, [(user_id, other_user_id)] | ||
| ) | ||
| break | ||
|
|
||
| if found_public_share: | ||
| yield self.store.update_users_who_share_room( | ||
| room_id, not is_public, [(user_id, other_user_id)] | ||
| ) | ||
|
|
||
| @defer.inlineCallbacks | ||
| def _handle_profile_change(self, user_id, room_id, prev_event_id, event_id): | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,47 @@ | ||
| /* Copyright 2017 Vector Creations Ltd, 2019 New Vector Ltd | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| -- Old disused version of the tables below. | ||
| DROP TABLE IF EXISTS users_who_share_rooms; | ||
|
|
||
| -- This is no longer used because it's duplicated by the users_who_share_public_rooms | ||
| DROP TABLE IF EXISTS users_in_public_rooms; | ||
|
|
||
| -- Tables keeping track of what users share rooms. This is a map of local users | ||
| -- to local or remote users, per room. Remote users cannot be in the user_id | ||
| -- column, only the other_user_id column. There are two tables, one for public | ||
| -- rooms and those for private rooms. | ||
| CREATE TABLE IF NOT EXISTS users_who_share_public_rooms ( | ||
| user_id TEXT NOT NULL, | ||
| other_user_id TEXT NOT NULL, | ||
| room_id TEXT NOT NULL | ||
| ); | ||
|
|
||
| CREATE TABLE IF NOT EXISTS users_who_share_private_rooms ( | ||
| user_id TEXT NOT NULL, | ||
| other_user_id TEXT NOT NULL, | ||
| room_id TEXT NOT NULL | ||
| ); | ||
|
|
||
| CREATE UNIQUE INDEX users_who_share_public_rooms_u_idx ON users_who_share_public_rooms(user_id, other_user_id, room_id); | ||
| CREATE INDEX users_who_share_public_rooms_r_idx ON users_who_share_public_rooms(room_id); | ||
| CREATE INDEX users_who_share_public_rooms_o_idx ON users_who_share_public_rooms(other_user_id); | ||
|
|
||
| CREATE UNIQUE INDEX users_who_share_private_rooms_u_idx ON users_who_share_private_rooms(user_id, other_user_id, room_id); | ||
| CREATE INDEX users_who_share_private_rooms_r_idx ON users_who_share_private_rooms(room_id); | ||
| CREATE INDEX users_who_share_private_rooms_o_idx ON users_who_share_private_rooms(other_user_id); | ||
|
|
||
| -- Make sure that we populate the tables initially by resetting the stream ID | ||
| UPDATE user_directory_stream_pos SET stream_id = NULL; |
Uh oh!
There was an error while loading. Please reload this page.