Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 03aff4c

Browse files
authored
Add a worker store for search insertion. (#7516)
This is required as both event persistence and the background update needs access to this function. It should be perfectly safe for two workers to write to that table at the same time.
1 parent 16090a0 commit 03aff4c

File tree

3 files changed

+52
-47
lines changed

3 files changed

+52
-47
lines changed

changelog.d/7516.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add a worker store for search insertion, required for moving event persistence off master.

synapse/app/generic_worker.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@
122122
MonthlyActiveUsersWorkerStore,
123123
)
124124
from synapse.storage.data_stores.main.presence import UserPresenceState
125+
from synapse.storage.data_stores.main.search import SearchWorkerStore
125126
from synapse.storage.data_stores.main.ui_auth import UIAuthWorkerStore
126127
from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
127128
from synapse.types import ReadReceipt
@@ -451,6 +452,7 @@ class GenericWorkerSlavedStore(
451452
SlavedFilteringStore,
452453
MonthlyActiveUsersWorkerStore,
453454
MediaRepositoryStore,
455+
SearchWorkerStore,
454456
BaseSlavedStore,
455457
):
456458
def __init__(self, database, db_conn, hs):

synapse/storage/data_stores/main/search.py

Lines changed: 49 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,55 @@
3737
)
3838

3939

40-
class SearchBackgroundUpdateStore(SQLBaseStore):
40+
class SearchWorkerStore(SQLBaseStore):
41+
def store_search_entries_txn(self, txn, entries):
42+
"""Add entries to the search table
43+
44+
Args:
45+
txn (cursor):
46+
entries (iterable[SearchEntry]):
47+
entries to be added to the table
48+
"""
49+
if not self.hs.config.enable_search:
50+
return
51+
if isinstance(self.database_engine, PostgresEngine):
52+
sql = (
53+
"INSERT INTO event_search"
54+
" (event_id, room_id, key, vector, stream_ordering, origin_server_ts)"
55+
" VALUES (?,?,?,to_tsvector('english', ?),?,?)"
56+
)
57+
58+
args = (
59+
(
60+
entry.event_id,
61+
entry.room_id,
62+
entry.key,
63+
entry.value,
64+
entry.stream_ordering,
65+
entry.origin_server_ts,
66+
)
67+
for entry in entries
68+
)
69+
70+
txn.executemany(sql, args)
71+
72+
elif isinstance(self.database_engine, Sqlite3Engine):
73+
sql = (
74+
"INSERT INTO event_search (event_id, room_id, key, value)"
75+
" VALUES (?,?,?,?)"
76+
)
77+
args = (
78+
(entry.event_id, entry.room_id, entry.key, entry.value)
79+
for entry in entries
80+
)
81+
82+
txn.executemany(sql, args)
83+
else:
84+
# This should be unreachable.
85+
raise Exception("Unrecognized database engine")
86+
87+
88+
class SearchBackgroundUpdateStore(SearchWorkerStore):
4189

4290
EVENT_SEARCH_UPDATE_NAME = "event_search"
4391
EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order"
@@ -296,52 +344,6 @@ def reindex_search_txn(txn):
296344

297345
return num_rows
298346

299-
def store_search_entries_txn(self, txn, entries):
300-
"""Add entries to the search table
301-
302-
Args:
303-
txn (cursor):
304-
entries (iterable[SearchEntry]):
305-
entries to be added to the table
306-
"""
307-
if not self.hs.config.enable_search:
308-
return
309-
if isinstance(self.database_engine, PostgresEngine):
310-
sql = (
311-
"INSERT INTO event_search"
312-
" (event_id, room_id, key, vector, stream_ordering, origin_server_ts)"
313-
" VALUES (?,?,?,to_tsvector('english', ?),?,?)"
314-
)
315-
316-
args = (
317-
(
318-
entry.event_id,
319-
entry.room_id,
320-
entry.key,
321-
entry.value,
322-
entry.stream_ordering,
323-
entry.origin_server_ts,
324-
)
325-
for entry in entries
326-
)
327-
328-
txn.executemany(sql, args)
329-
330-
elif isinstance(self.database_engine, Sqlite3Engine):
331-
sql = (
332-
"INSERT INTO event_search (event_id, room_id, key, value)"
333-
" VALUES (?,?,?,?)"
334-
)
335-
args = (
336-
(entry.event_id, entry.room_id, entry.key, entry.value)
337-
for entry in entries
338-
)
339-
340-
txn.executemany(sql, args)
341-
else:
342-
# This should be unreachable.
343-
raise Exception("Unrecognized database engine")
344-
345347

346348
class SearchStore(SearchBackgroundUpdateStore):
347349
def __init__(self, database: Database, db_conn, hs):

0 commit comments

Comments
 (0)