Skip to content

Commit d228f08

Browse files
committed
wip incorporate design changes
1 parent e905b9b commit d228f08

File tree

11 files changed

+554
-91
lines changed

11 files changed

+554
-91
lines changed

pymongo/asynchronous/pool.py

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -820,8 +820,7 @@ def __init__(
820820
async def ready(self) -> None:
821821
# Take the lock to avoid the race condition described in PYTHON-2699.
822822
async with self.lock:
823-
# Do not set the pool as ready if in backoff.
824-
if self._backoff:
823+
if self.state == PoolState.BACKOFF:
825824
return
826825
if self.state != PoolState.READY:
827826
self.state = PoolState.READY
@@ -847,14 +846,11 @@ async def _reset(
847846
pause: bool = True,
848847
service_id: Optional[ObjectId] = None,
849848
interrupt_connections: bool = False,
850-
from_server_description: bool = False,
851849
) -> None:
852850
old_state = self.state
853851
async with self.size_cond:
854852
if self.closed:
855853
return
856-
if from_server_description and self.state == PoolState.BACKOFF:
857-
return
858854
# Clear the backoff amount.
859855
self._backoff = 0
860856
if self.opts.pause_enabled and pause and not self.opts.load_balanced:
@@ -954,16 +950,12 @@ async def update_is_writable(self, is_writable: Optional[bool]) -> None:
954950
_socket.update_is_writable(self.is_writable) # type: ignore[arg-type]
955951

956952
async def reset(
957-
self,
958-
service_id: Optional[ObjectId] = None,
959-
interrupt_connections: bool = False,
960-
from_server_description: bool = False,
953+
self, service_id: Optional[ObjectId] = None, interrupt_connections: bool = False
961954
) -> None:
962955
await self._reset(
963956
close=False,
964957
service_id=service_id,
965958
interrupt_connections=interrupt_connections,
966-
from_server_description=from_server_description,
967959
)
968960

969961
async def reset_without_pause(self) -> None:
@@ -1044,27 +1036,30 @@ async def remove_stale_sockets(self, reference_generation: int) -> None:
10441036
self.requests -= 1
10451037
self.size_cond.notify()
10461038

1047-
def _handle_connection_error(self, error: BaseException, phase: str) -> None:
1039+
async def _handle_connection_error(self, error: BaseException, phase: str) -> None:
10481040
# Handle system overload condition for non-sdam pools.
10491041
# Look for an AutoReconnect or NetworkTimeout error.
10501042
# If found, set backoff and add error labels.
10511043
if self.is_sdam or type(error) not in (AutoReconnect, NetworkTimeout):
10521044
return
10531045
error._add_error_label("SystemOverloadedError") # type:ignore[attr-defined]
10541046
error._add_error_label("RetryableError") # type:ignore[attr-defined]
1055-
self.backoff()
1047+
await self.backoff()
10561048

1057-
def backoff(self) -> None:
1049+
async def backoff(self) -> None:
10581050
"""Set/increase backoff mode."""
1059-
self._backoff += 1
1060-
backoff_duration_sec = _backoff(self._backoff)
1061-
backoff_duration_ms = int(backoff_duration_sec * 1000)
1062-
if self.state != PoolState.BACKOFF:
1063-
self.state = PoolState.BACKOFF
1064-
if self.enabled_for_cmap:
1065-
assert self.opts._event_listeners is not None
1066-
self.opts._event_listeners.publish_pool_backoff(self.address, backoff_duration_ms)
1067-
self._backoff_connection_time = backoff_duration_sec + time.monotonic()
1051+
async with self.lock:
1052+
self._backoff += 1
1053+
backoff_duration_sec = _backoff(self._backoff)
1054+
backoff_duration_ms = int(backoff_duration_sec * 1000)
1055+
if self.state != PoolState.BACKOFF:
1056+
self.state = PoolState.BACKOFF
1057+
if self.enabled_for_cmap:
1058+
assert self.opts._event_listeners is not None
1059+
self.opts._event_listeners.publish_pool_backoff(
1060+
self.address, self._backoff, backoff_duration_ms
1061+
)
1062+
self._backoff_connection_time = backoff_duration_sec + time.monotonic()
10681063

10691064
# Log the pool backoff message.
10701065
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
@@ -1074,6 +1069,7 @@ def backoff(self) -> None:
10741069
clientId=self._client_id,
10751070
serverHost=self.address[0],
10761071
serverPort=self.address[1],
1072+
attempt=self._backoff,
10771073
durationMS=backoff_duration_ms,
10781074
reason=_verbose_connection_error_reason(ConnectionClosedReason.POOL_BACKOFF),
10791075
error=ConnectionClosedReason.POOL_BACKOFF,
@@ -1136,7 +1132,7 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
11361132
error=ConnectionClosedReason.ERROR,
11371133
)
11381134
if context["has_created_socket"]:
1139-
self._handle_connection_error(error, "handshake")
1135+
await self._handle_connection_error(error, "handshake")
11401136
if isinstance(error, (IOError, OSError, *SSLErrors)):
11411137
details = _get_timeout_details(self.opts)
11421138
_raise_connection_failure(self.address, error, timeout_details=details)
@@ -1148,9 +1144,11 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
11481144
self.active_contexts.discard(tmp_context)
11491145
if tmp_context.cancelled:
11501146
conn.cancel_context.cancel()
1147+
has_completed_hello = False
11511148
try:
11521149
if not self.is_sdam:
11531150
await conn.hello()
1151+
has_completed_hello = True
11541152
self.is_writable = conn.is_writable
11551153
if handler:
11561154
handler.contribute_socket(conn, completed_handshake=False)
@@ -1160,7 +1158,8 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
11601158
except BaseException as e:
11611159
async with self.lock:
11621160
self.active_contexts.discard(conn.cancel_context)
1163-
self._handle_connection_error(e, "hello")
1161+
if not has_completed_hello:
1162+
await self._handle_connection_error(e, "hello")
11641163
await conn.close_conn(ConnectionClosedReason.ERROR)
11651164
raise
11661165

pymongo/asynchronous/topology.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
_SDAMStatusMessage,
5858
_ServerSelectionStatusMessage,
5959
)
60+
from pymongo.pool import PoolState
6061
from pymongo.pool_options import PoolOptions
6162
from pymongo.server_description import ServerDescription
6263
from pymongo.server_selectors import (
@@ -485,7 +486,7 @@ async def _process_change(
485486
server_description.is_server_type_known and new_td.topology_type == TOPOLOGY_TYPE.Single
486487
):
487488
server = self._servers.get(server_description.address)
488-
if server:
489+
if server and server.pool.state != PoolState.BACKOFF:
489490
await server.pool.ready()
490491

491492
suppress_event = sd_old == server_description
@@ -555,9 +556,7 @@ async def on_change(
555556
if reset_pool:
556557
server = self._servers.get(server_description.address)
557558
if server:
558-
await server.pool.reset(
559-
interrupt_connections=interrupt_connections, from_server_description=True
560-
)
559+
await server.pool.reset(interrupt_connections=interrupt_connections)
561560

562561
async def _process_srv_update(self, seedlist: list[tuple[str, Any]]) -> None:
563562
"""Process a new seedlist on an opened topology.

pymongo/monitoring.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -931,17 +931,24 @@ class PoolBackoffEvent(_PoolEvent):
931931
932932
:param address: The address (host, port) pair of the server this Pool is
933933
attempting to connect to.
934+
:param attempt: The backoff attempt.
934935
:param duration_ms: The backoff duration in ms.
935936
936937
.. versionadded:: 4.16
937938
"""
938939

939-
__slots__ = ("__duration_ms",)
940+
__slots__ = ("__attempt", "__duration_ms")
940941

941-
def __init__(self, address: _Address, duration_ms: int) -> None:
942+
def __init__(self, address: _Address, attempt: int, duration_ms: int) -> None:
942943
super().__init__(address)
944+
self.__attempt = attempt
943945
self.__duration_ms = duration_ms
944946

947+
@property
948+
def attempt(self) -> int:
949+
"""The backoff attempt."""
950+
return self.__attempt
951+
945952
@property
946953
def duration_ms(self) -> int:
947954
"""The backoff duration in ms."""
@@ -1864,9 +1871,9 @@ def publish_pool_closed(self, address: _Address) -> None:
18641871
except Exception:
18651872
_handle_exception()
18661873

1867-
def publish_pool_backoff(self, address: _Address, attempt: int) -> None:
1874+
def publish_pool_backoff(self, address: _Address, attempt: int, duration_ms: int) -> None:
18681875
"""Publish a :class:`PoolBackoffEvent` to all pool listeners."""
1869-
event = PoolBackoffEvent(address, attempt)
1876+
event = PoolBackoffEvent(address, attempt, duration_ms)
18701877
for subscriber in self.__cmap_listeners:
18711878
try:
18721879
subscriber.pool_backoff(event)

pymongo/synchronous/pool.py

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -818,8 +818,7 @@ def __init__(
818818
def ready(self) -> None:
819819
# Take the lock to avoid the race condition described in PYTHON-2699.
820820
with self.lock:
821-
# Do not set the pool as ready if in backoff.
822-
if self._backoff:
821+
if self.state == PoolState.BACKOFF:
823822
return
824823
if self.state != PoolState.READY:
825824
self.state = PoolState.READY
@@ -845,14 +844,11 @@ def _reset(
845844
pause: bool = True,
846845
service_id: Optional[ObjectId] = None,
847846
interrupt_connections: bool = False,
848-
from_server_description: bool = False,
849847
) -> None:
850848
old_state = self.state
851849
with self.size_cond:
852850
if self.closed:
853851
return
854-
if from_server_description and self.state == PoolState.BACKOFF:
855-
return
856852
# Clear the backoff amount.
857853
self._backoff = 0
858854
if self.opts.pause_enabled and pause and not self.opts.load_balanced:
@@ -952,16 +948,12 @@ def update_is_writable(self, is_writable: Optional[bool]) -> None:
952948
_socket.update_is_writable(self.is_writable) # type: ignore[arg-type]
953949

954950
def reset(
955-
self,
956-
service_id: Optional[ObjectId] = None,
957-
interrupt_connections: bool = False,
958-
from_server_description: bool = False,
951+
self, service_id: Optional[ObjectId] = None, interrupt_connections: bool = False
959952
) -> None:
960953
self._reset(
961954
close=False,
962955
service_id=service_id,
963956
interrupt_connections=interrupt_connections,
964-
from_server_description=from_server_description,
965957
)
966958

967959
def reset_without_pause(self) -> None:
@@ -1054,15 +1046,18 @@ def _handle_connection_error(self, error: BaseException, phase: str) -> None:
10541046

10551047
def backoff(self) -> None:
10561048
"""Set/increase backoff mode."""
1057-
self._backoff += 1
1058-
backoff_duration_sec = _backoff(self._backoff)
1059-
backoff_duration_ms = int(backoff_duration_sec * 1000)
1060-
if self.state != PoolState.BACKOFF:
1061-
self.state = PoolState.BACKOFF
1062-
if self.enabled_for_cmap:
1063-
assert self.opts._event_listeners is not None
1064-
self.opts._event_listeners.publish_pool_backoff(self.address, backoff_duration_ms)
1065-
self._backoff_connection_time = backoff_duration_sec + time.monotonic()
1049+
with self.lock:
1050+
self._backoff += 1
1051+
backoff_duration_sec = _backoff(self._backoff)
1052+
backoff_duration_ms = int(backoff_duration_sec * 1000)
1053+
if self.state != PoolState.BACKOFF:
1054+
self.state = PoolState.BACKOFF
1055+
if self.enabled_for_cmap:
1056+
assert self.opts._event_listeners is not None
1057+
self.opts._event_listeners.publish_pool_backoff(
1058+
self.address, self._backoff, backoff_duration_ms
1059+
)
1060+
self._backoff_connection_time = backoff_duration_sec + time.monotonic()
10661061

10671062
# Log the pool backoff message.
10681063
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
@@ -1072,6 +1067,7 @@ def backoff(self) -> None:
10721067
clientId=self._client_id,
10731068
serverHost=self.address[0],
10741069
serverPort=self.address[1],
1070+
attempt=self._backoff,
10751071
durationMS=backoff_duration_ms,
10761072
reason=_verbose_connection_error_reason(ConnectionClosedReason.POOL_BACKOFF),
10771073
error=ConnectionClosedReason.POOL_BACKOFF,
@@ -1146,9 +1142,11 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect
11461142
self.active_contexts.discard(tmp_context)
11471143
if tmp_context.cancelled:
11481144
conn.cancel_context.cancel()
1145+
has_completed_hello = False
11491146
try:
11501147
if not self.is_sdam:
11511148
conn.hello()
1149+
has_completed_hello = True
11521150
self.is_writable = conn.is_writable
11531151
if handler:
11541152
handler.contribute_socket(conn, completed_handshake=False)
@@ -1158,7 +1156,8 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect
11581156
except BaseException as e:
11591157
with self.lock:
11601158
self.active_contexts.discard(conn.cancel_context)
1161-
self._handle_connection_error(e, "hello")
1159+
if not has_completed_hello:
1160+
self._handle_connection_error(e, "hello")
11621161
conn.close_conn(ConnectionClosedReason.ERROR)
11631162
raise
11641163

pymongo/synchronous/topology.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
_SDAMStatusMessage,
5454
_ServerSelectionStatusMessage,
5555
)
56+
from pymongo.pool import PoolState
5657
from pymongo.pool_options import PoolOptions
5758
from pymongo.server_description import ServerDescription
5859
from pymongo.server_selectors import (
@@ -485,7 +486,7 @@ def _process_change(
485486
server_description.is_server_type_known and new_td.topology_type == TOPOLOGY_TYPE.Single
486487
):
487488
server = self._servers.get(server_description.address)
488-
if server:
489+
if server and server.pool.state != PoolState.BACKOFF:
489490
server.pool.ready()
490491

491492
suppress_event = sd_old == server_description
@@ -555,9 +556,7 @@ def on_change(
555556
if reset_pool:
556557
server = self._servers.get(server_description.address)
557558
if server:
558-
server.pool.reset(
559-
interrupt_connections=interrupt_connections, from_server_description=True
560-
)
559+
server.pool.reset(interrupt_connections=interrupt_connections)
561560

562561
def _process_srv_update(self, seedlist: list[tuple[str, Any]]) -> None:
563562
"""Process a new seedlist on an opened topology.

0 commit comments

Comments
 (0)