Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit b59d285

Browse files
Fizzadarbabolivier
andauthored
Db txn set isolation level (#11799)
Co-authored-by: Brendan Abolivier <[email protected]>
1 parent fc8598b commit b59d285

File tree

5 files changed

+61
-5
lines changed

5 files changed

+61
-5
lines changed

changelog.d/11799.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Preparation for reducing Postgres serialization errors: allow setting transaction isolation level. Contributed by Nick @ Beeper.

synapse/storage/database.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -702,6 +702,7 @@ async def runInteraction(
702702
func: Callable[..., R],
703703
*args: Any,
704704
db_autocommit: bool = False,
705+
isolation_level: Optional[int] = None,
705706
**kwargs: Any,
706707
) -> R:
707708
"""Starts a transaction on the database and runs a given function
@@ -724,6 +725,7 @@ async def runInteraction(
724725
called multiple times if the transaction is retried, so must
725726
correctly handle that case.
726727
728+
isolation_level: Set the server isolation level for this transaction.
727729
args: positional args to pass to `func`
728730
kwargs: named args to pass to `func`
729731
@@ -763,6 +765,7 @@ async def runWithConnection(
763765
func: Callable[..., R],
764766
*args: Any,
765767
db_autocommit: bool = False,
768+
isolation_level: Optional[int] = None,
766769
**kwargs: Any,
767770
) -> R:
768771
"""Wraps the .runWithConnection() method on the underlying db_pool.
@@ -775,6 +778,7 @@ async def runWithConnection(
775778
db_autocommit: Whether to run the function in "autocommit" mode,
776779
i.e. outside of a transaction. This is useful for transaction
777780
that are only a single query. Currently only affects postgres.
781+
isolation_level: Set the server isolation level for this transaction.
778782
kwargs: named args to pass to `func`
779783
780784
Returns:
@@ -834,6 +838,10 @@ def inner_func(conn, *args, **kwargs):
834838
try:
835839
if db_autocommit:
836840
self.engine.attempt_to_set_autocommit(conn, True)
841+
if isolation_level is not None:
842+
self.engine.attempt_to_set_isolation_level(
843+
conn, isolation_level
844+
)
837845

838846
db_conn = LoggingDatabaseConnection(
839847
conn, self.engine, "runWithConnection"
@@ -842,6 +850,8 @@ def inner_func(conn, *args, **kwargs):
842850
finally:
843851
if db_autocommit:
844852
self.engine.attempt_to_set_autocommit(conn, False)
853+
if isolation_level:
854+
self.engine.attempt_to_set_isolation_level(conn, None)
845855

846856
return await make_deferred_yieldable(
847857
self._db_pool.runWithConnection(inner_func, *args, **kwargs)

synapse/storage/engines/_base.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,18 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
import abc
15-
from typing import Generic, TypeVar
15+
from enum import IntEnum
16+
from typing import Generic, Optional, TypeVar
1617

1718
from synapse.storage.types import Connection
1819

1920

21+
class IsolationLevel(IntEnum):
22+
READ_COMMITTED: int = 1
23+
REPEATABLE_READ: int = 2
24+
SERIALIZABLE: int = 3
25+
26+
2027
class IncorrectDatabaseSetup(RuntimeError):
2128
pass
2229

@@ -109,3 +116,13 @@ def attempt_to_set_autocommit(self, conn: Connection, autocommit: bool):
109116
commit/rollback the connections.
110117
"""
111118
...
119+
120+
@abc.abstractmethod
121+
def attempt_to_set_isolation_level(
122+
self, conn: Connection, isolation_level: Optional[int]
123+
):
124+
"""Attempt to set the connections isolation level.
125+
126+
Note: This has no effect on SQLite3, as transactions are SERIALIZABLE by default.
127+
"""
128+
...

synapse/storage/engines/postgres.py

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,13 @@
1313
# limitations under the License.
1414

1515
import logging
16+
from typing import Mapping, Optional
1617

17-
from synapse.storage.engines._base import BaseDatabaseEngine, IncorrectDatabaseSetup
18+
from synapse.storage.engines._base import (
19+
BaseDatabaseEngine,
20+
IncorrectDatabaseSetup,
21+
IsolationLevel,
22+
)
1823
from synapse.storage.types import Connection
1924

2025
logger = logging.getLogger(__name__)
@@ -34,6 +39,15 @@ def _disable_bytes_adapter(_):
3439
self.synchronous_commit = database_config.get("synchronous_commit", True)
3540
self._version = None # unknown as yet
3641

42+
self.isolation_level_map: Mapping[int, int] = {
43+
IsolationLevel.READ_COMMITTED: self.module.extensions.ISOLATION_LEVEL_READ_COMMITTED,
44+
IsolationLevel.REPEATABLE_READ: self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ,
45+
IsolationLevel.SERIALIZABLE: self.module.extensions.ISOLATION_LEVEL_SERIALIZABLE,
46+
}
47+
self.default_isolation_level = (
48+
self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
49+
)
50+
3751
@property
3852
def single_threaded(self) -> bool:
3953
return False
@@ -104,9 +118,7 @@ def convert_param_style(self, sql):
104118
return sql.replace("?", "%s")
105119

106120
def on_new_connection(self, db_conn):
107-
db_conn.set_isolation_level(
108-
self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
109-
)
121+
db_conn.set_isolation_level(self.default_isolation_level)
110122

111123
# Set the bytea output to escape, vs the default of hex
112124
cursor = db_conn.cursor()
@@ -175,3 +187,12 @@ def in_transaction(self, conn: Connection) -> bool:
175187

176188
def attempt_to_set_autocommit(self, conn: Connection, autocommit: bool):
177189
return conn.set_session(autocommit=autocommit) # type: ignore
190+
191+
def attempt_to_set_isolation_level(
192+
self, conn: Connection, isolation_level: Optional[int]
193+
):
194+
if isolation_level is None:
195+
isolation_level = self.default_isolation_level
196+
else:
197+
isolation_level = self.isolation_level_map[isolation_level]
198+
return conn.set_isolation_level(isolation_level) # type: ignore

synapse/storage/engines/sqlite.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import struct
1616
import threading
1717
import typing
18+
from typing import Optional
1819

1920
from synapse.storage.engines import BaseDatabaseEngine
2021
from synapse.storage.types import Connection
@@ -122,6 +123,12 @@ def attempt_to_set_autocommit(self, conn: Connection, autocommit: bool):
122123
# set the connection to autocommit mode.
123124
pass
124125

126+
def attempt_to_set_isolation_level(
127+
self, conn: Connection, isolation_level: Optional[int]
128+
):
129+
# All transactions are SERIALIZABLE by default in sqllite
130+
pass
131+
125132

126133
# Following functions taken from: https://github.com/coleifer/peewee
127134

0 commit comments

Comments
 (0)