@@ -35,31 +35,29 @@ from synapse.logging.context import (
3535 make_deferred_yieldable ,
3636 run_in_background ,
3737)
38- from synapse .storage .data_stores .main .client_ips import ClientIpBackgroundUpdateStore
39- from synapse .storage .data_stores .main .deviceinbox import (
40- DeviceInboxBackgroundUpdateStore ,
41- )
42- from synapse .storage .data_stores .main .devices import DeviceBackgroundUpdateStore
43- from synapse .storage .data_stores .main .events_bg_updates import (
38+ from synapse .storage .database import DatabasePool , make_conn
39+ from synapse .storage .databases .main .client_ips import ClientIpBackgroundUpdateStore
40+ from synapse .storage .databases .main .deviceinbox import DeviceInboxBackgroundUpdateStore
41+ from synapse .storage .databases .main .devices import DeviceBackgroundUpdateStore
42+ from synapse .storage .databases .main .events_bg_updates import (
4443 EventsBackgroundUpdatesStore ,
4544)
46- from synapse .storage .data_stores .main .media_repository import (
45+ from synapse .storage .databases .main .media_repository import (
4746 MediaRepositoryBackgroundUpdateStore ,
4847)
49- from synapse .storage .data_stores .main .registration import (
48+ from synapse .storage .databases .main .registration import (
5049 RegistrationBackgroundUpdateStore ,
5150 find_max_generated_user_id_localpart ,
5251)
53- from synapse .storage .data_stores .main .room import RoomBackgroundUpdateStore
54- from synapse .storage .data_stores .main .roommember import RoomMemberBackgroundUpdateStore
55- from synapse .storage .data_stores .main .search import SearchBackgroundUpdateStore
56- from synapse .storage .data_stores .main .state import MainStateBackgroundUpdateStore
57- from synapse .storage .data_stores .main .stats import StatsStore
58- from synapse .storage .data_stores .main .user_directory import (
52+ from synapse .storage .databases .main .room import RoomBackgroundUpdateStore
53+ from synapse .storage .databases .main .roommember import RoomMemberBackgroundUpdateStore
54+ from synapse .storage .databases .main .search import SearchBackgroundUpdateStore
55+ from synapse .storage .databases .main .state import MainStateBackgroundUpdateStore
56+ from synapse .storage .databases .main .stats import StatsStore
57+ from synapse .storage .databases .main .user_directory import (
5958 UserDirectoryBackgroundUpdateStore ,
6059)
61- from synapse .storage .data_stores .state .bg_updates import StateBackgroundUpdateStore
62- from synapse .storage .database import Database , make_conn
60+ from synapse .storage .databases .state .bg_updates import StateBackgroundUpdateStore
6361from synapse .storage .engines import create_engine
6462from synapse .storage .prepare_database import prepare_database
6563from synapse .util import Clock
@@ -175,14 +173,14 @@ class Store(
175173 StatsStore ,
176174):
177175 def execute (self , f , * args , ** kwargs ):
178- return self .db .runInteraction (f .__name__ , f , * args , ** kwargs )
176+ return self .db_pool .runInteraction (f .__name__ , f , * args , ** kwargs )
179177
180178 def execute_sql (self , sql , * args ):
181179 def r (txn ):
182180 txn .execute (sql , args )
183181 return txn .fetchall ()
184182
185- return self .db .runInteraction ("execute_sql" , r )
183+ return self .db_pool .runInteraction ("execute_sql" , r )
186184
187185 def insert_many_txn (self , txn , table , headers , rows ):
188186 sql = "INSERT INTO %s (%s) VALUES (%s)" % (
@@ -227,7 +225,7 @@ class Porter(object):
227225 async def setup_table (self , table ):
228226 if table in APPEND_ONLY_TABLES :
229227 # It's safe to just carry on inserting.
230- row = await self .postgres_store .db .simple_select_one (
228+ row = await self .postgres_store .db_pool .simple_select_one (
231229 table = "port_from_sqlite3" ,
232230 keyvalues = {"table_name" : table },
233231 retcols = ("forward_rowid" , "backward_rowid" ),
@@ -244,7 +242,7 @@ class Porter(object):
244242 ) = await self ._setup_sent_transactions ()
245243 backward_chunk = 0
246244 else :
247- await self .postgres_store .db .simple_insert (
245+ await self .postgres_store .db_pool .simple_insert (
248246 table = "port_from_sqlite3" ,
249247 values = {
250248 "table_name" : table ,
@@ -274,7 +272,7 @@ class Porter(object):
274272
275273 await self .postgres_store .execute (delete_all )
276274
277- await self .postgres_store .db .simple_insert (
275+ await self .postgres_store .db_pool .simple_insert (
278276 table = "port_from_sqlite3" ,
279277 values = {"table_name" : table , "forward_rowid" : 1 , "backward_rowid" : 0 },
280278 )
@@ -318,7 +316,7 @@ class Porter(object):
318316 if table == "user_directory_stream_pos" :
319317 # We need to make sure there is a single row, `(X, null), as that is
320318 # what synapse expects to be there.
321- await self .postgres_store .db .simple_insert (
319+ await self .postgres_store .db_pool .simple_insert (
322320 table = table , values = {"stream_id" : None }
323321 )
324322 self .progress .update (table , table_size ) # Mark table as done
@@ -359,7 +357,7 @@ class Porter(object):
359357
360358 return headers , forward_rows , backward_rows
361359
362- headers , frows , brows = await self .sqlite_store .db .runInteraction (
360+ headers , frows , brows = await self .sqlite_store .db_pool .runInteraction (
363361 "select" , r
364362 )
365363
@@ -375,7 +373,7 @@ class Porter(object):
375373 def insert (txn ):
376374 self .postgres_store .insert_many_txn (txn , table , headers [1 :], rows )
377375
378- self .postgres_store .db .simple_update_one_txn (
376+ self .postgres_store .db_pool .simple_update_one_txn (
379377 txn ,
380378 table = "port_from_sqlite3" ,
381379 keyvalues = {"table_name" : table },
@@ -413,7 +411,7 @@ class Porter(object):
413411
414412 return headers , rows
415413
416- headers , rows = await self .sqlite_store .db .runInteraction ("select" , r )
414+ headers , rows = await self .sqlite_store .db_pool .runInteraction ("select" , r )
417415
418416 if rows :
419417 forward_chunk = rows [- 1 ][0 ] + 1
@@ -451,7 +449,7 @@ class Porter(object):
451449 ],
452450 )
453451
454- self .postgres_store .db .simple_update_one_txn (
452+ self .postgres_store .db_pool .simple_update_one_txn (
455453 txn ,
456454 table = "port_from_sqlite3" ,
457455 keyvalues = {"table_name" : "event_search" },
@@ -494,15 +492,15 @@ class Porter(object):
494492 db_conn , allow_outdated_version = allow_outdated_version
495493 )
496494 prepare_database (db_conn , engine , config = self .hs_config )
497- store = Store (Database (hs , db_config , engine ), db_conn , hs )
495+ store = Store (DatabasePool (hs , db_config , engine ), db_conn , hs )
498496 db_conn .commit ()
499497
500498 return store
501499
502500 async def run_background_updates_on_postgres (self ):
503501 # Manually apply all background updates on the PostgreSQL database.
504502 postgres_ready = (
505- await self .postgres_store .db .updates .has_completed_background_updates ()
503+ await self .postgres_store .db_pool .updates .has_completed_background_updates ()
506504 )
507505
508506 if not postgres_ready :
@@ -511,9 +509,9 @@ class Porter(object):
511509 self .progress .set_state ("Running background updates on PostgreSQL" )
512510
513511 while not postgres_ready :
514- await self .postgres_store .db .updates .do_next_background_update (100 )
512+ await self .postgres_store .db_pool .updates .do_next_background_update (100 )
515513 postgres_ready = await (
516- self .postgres_store .db .updates .has_completed_background_updates ()
514+ self .postgres_store .db_pool .updates .has_completed_background_updates ()
517515 )
518516
519517 async def run (self ):
@@ -534,7 +532,7 @@ class Porter(object):
534532
535533 # Check if all background updates are done, abort if not.
536534 updates_complete = (
537- await self .sqlite_store .db .updates .has_completed_background_updates ()
535+ await self .sqlite_store .db_pool .updates .has_completed_background_updates ()
538536 )
539537 if not updates_complete :
540538 end_error = (
@@ -576,22 +574,24 @@ class Porter(object):
576574 )
577575
578576 try :
579- await self .postgres_store .db .runInteraction ("alter_table" , alter_table )
577+ await self .postgres_store .db_pool .runInteraction (
578+ "alter_table" , alter_table
579+ )
580580 except Exception :
581581 # On Error Resume Next
582582 pass
583583
584- await self .postgres_store .db .runInteraction (
584+ await self .postgres_store .db_pool .runInteraction (
585585 "create_port_table" , create_port_table
586586 )
587587
588588 # Step 2. Get tables.
589589 self .progress .set_state ("Fetching tables" )
590- sqlite_tables = await self .sqlite_store .db .simple_select_onecol (
590+ sqlite_tables = await self .sqlite_store .db_pool .simple_select_onecol (
591591 table = "sqlite_master" , keyvalues = {"type" : "table" }, retcol = "name"
592592 )
593593
594- postgres_tables = await self .postgres_store .db .simple_select_onecol (
594+ postgres_tables = await self .postgres_store .db_pool .simple_select_onecol (
595595 table = "information_schema.tables" ,
596596 keyvalues = {},
597597 retcol = "distinct table_name" ,
@@ -692,7 +692,7 @@ class Porter(object):
692692
693693 return headers , [r for r in rows if r [ts_ind ] < yesterday ]
694694
695- headers , rows = await self .sqlite_store .db .runInteraction ("select" , r )
695+ headers , rows = await self .sqlite_store .db_pool .runInteraction ("select" , r )
696696
697697 rows = self ._convert_rows ("sent_transactions" , headers , rows )
698698
@@ -725,7 +725,7 @@ class Porter(object):
725725 next_chunk = await self .sqlite_store .execute (get_start_id )
726726 next_chunk = max (max_inserted_rowid + 1 , next_chunk )
727727
728- await self .postgres_store .db .simple_insert (
728+ await self .postgres_store .db_pool .simple_insert (
729729 table = "port_from_sqlite3" ,
730730 values = {
731731 "table_name" : "sent_transactions" ,
@@ -794,14 +794,14 @@ class Porter(object):
794794 next_id = curr_id + 1
795795 txn .execute ("ALTER SEQUENCE state_group_id_seq RESTART WITH %s" , (next_id ,))
796796
797- return self .postgres_store .db .runInteraction ("setup_state_group_id_seq" , r )
797+ return self .postgres_store .db_pool .runInteraction ("setup_state_group_id_seq" , r )
798798
799799 def _setup_user_id_seq (self ):
800800 def r (txn ):
801801 next_id = find_max_generated_user_id_localpart (txn ) + 1
802802 txn .execute ("ALTER SEQUENCE user_id_seq RESTART WITH %s" , (next_id ,))
803803
804- return self .postgres_store .db .runInteraction ("setup_user_id_seq" , r )
804+ return self .postgres_store .db_pool .runInteraction ("setup_user_id_seq" , r )
805805
806806
807807##############################################
0 commit comments