Skip to content

Commit f2def4b

Browse files
authored
Allow tx timeout to be 0 and send it (#642)
Altering TestKit back end to treat any exceptions raised inside the driver code (using trace back stack analysis) as `DriverError`s Update tx timeout docs
1 parent adb446a commit f2def4b

File tree

10 files changed

+152
-85
lines changed

10 files changed

+152
-85
lines changed

neo4j/_async/io/_bolt3.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -229,11 +229,13 @@ def run(self, query, parameters=None, mode=None, bookmarks=None,
229229
extra["tx_metadata"] = dict(metadata)
230230
except TypeError:
231231
raise TypeError("Metadata must be coercible to a dict")
232-
if timeout:
232+
if timeout is not None:
233233
try:
234-
extra["tx_timeout"] = int(1000 * timeout)
234+
extra["tx_timeout"] = int(1000 * float(timeout))
235235
except TypeError:
236236
raise TypeError("Timeout must be specified as a number of seconds")
237+
if extra["tx_timeout"] < 0:
238+
raise ValueError("Timeout must be a positive number or 0.")
237239
fields = (query, parameters, extra)
238240
log.debug("[#%04X] C: RUN %s", self.local_port, " ".join(map(repr, fields)))
239241
if query.upper() == u"COMMIT":
@@ -281,11 +283,13 @@ def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None,
281283
extra["tx_metadata"] = dict(metadata)
282284
except TypeError:
283285
raise TypeError("Metadata must be coercible to a dict")
284-
if timeout:
286+
if timeout is not None:
285287
try:
286-
extra["tx_timeout"] = int(1000 * timeout)
288+
extra["tx_timeout"] = int(1000 * float(timeout))
287289
except TypeError:
288290
raise TypeError("Timeout must be specified as a number of seconds")
291+
if extra["tx_timeout"] < 0:
292+
raise ValueError("Timeout must be a positive number or 0.")
289293
log.debug("[#%04X] C: BEGIN %r", self.local_port, extra)
290294
self._append(b"\x11", (extra,), Response(self, "begin", **handlers))
291295

neo4j/_async/io/_bolt4.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -181,11 +181,13 @@ def run(self, query, parameters=None, mode=None, bookmarks=None,
181181
extra["tx_metadata"] = dict(metadata)
182182
except TypeError:
183183
raise TypeError("Metadata must be coercible to a dict")
184-
if timeout:
184+
if timeout is not None:
185185
try:
186-
extra["tx_timeout"] = int(1000 * timeout)
186+
extra["tx_timeout"] = int(1000 * float(timeout))
187187
except TypeError:
188188
raise TypeError("Timeout must be specified as a number of seconds")
189+
if extra["tx_timeout"] < 0:
190+
raise ValueError("Timeout must be a positive number or 0.")
189191
fields = (query, parameters, extra)
190192
log.debug("[#%04X] C: RUN %s", self.local_port, " ".join(map(repr, fields)))
191193
if query.upper() == u"COMMIT":
@@ -232,11 +234,13 @@ def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None,
232234
extra["tx_metadata"] = dict(metadata)
233235
except TypeError:
234236
raise TypeError("Metadata must be coercible to a dict")
235-
if timeout:
237+
if timeout is not None:
236238
try:
237-
extra["tx_timeout"] = int(1000 * timeout)
239+
extra["tx_timeout"] = int(1000 * float(timeout))
238240
except TypeError:
239241
raise TypeError("Timeout must be specified as a number of seconds")
242+
if extra["tx_timeout"] < 0:
243+
raise ValueError("Timeout must be a positive number or 0.")
240244
log.debug("[#%04X] C: BEGIN %r", self.local_port, extra)
241245
self._append(b"\x11", (extra,), Response(self, "begin", **handlers))
242246

@@ -492,12 +496,13 @@ def run(self, query, parameters=None, mode=None, bookmarks=None,
492496
extra["tx_metadata"] = dict(metadata)
493497
except TypeError:
494498
raise TypeError("Metadata must be coercible to a dict")
495-
if timeout:
499+
if timeout is not None:
496500
try:
497-
extra["tx_timeout"] = int(1000 * timeout)
501+
extra["tx_timeout"] = int(1000 * float(timeout))
498502
except TypeError:
499-
raise TypeError("Timeout must be specified as a number of "
500-
"seconds")
503+
raise TypeError("Timeout must be specified as a number of seconds")
504+
if extra["tx_timeout"] < 0:
505+
raise ValueError("Timeout must be a positive number or 0.")
501506
fields = (query, parameters, extra)
502507
log.debug("[#%04X] C: RUN %s", self.local_port,
503508
" ".join(map(repr, fields)))
@@ -527,11 +532,12 @@ def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None,
527532
extra["tx_metadata"] = dict(metadata)
528533
except TypeError:
529534
raise TypeError("Metadata must be coercible to a dict")
530-
if timeout:
535+
if timeout is not None:
531536
try:
532-
extra["tx_timeout"] = int(1000 * timeout)
537+
extra["tx_timeout"] = int(1000 * float(timeout))
533538
except TypeError:
534-
raise TypeError("Timeout must be specified as a number of "
535-
"seconds")
539+
raise TypeError("Timeout must be specified as a number of seconds")
540+
if extra["tx_timeout"] < 0:
541+
raise ValueError("Timeout must be a positive number or 0.")
536542
log.debug("[#%04X] C: BEGIN %r", self.local_port, extra)
537543
self._append(b"\x11", (extra,), Response(self, "begin", **handlers))

neo4j/_sync/io/_bolt3.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -229,11 +229,13 @@ def run(self, query, parameters=None, mode=None, bookmarks=None,
229229
extra["tx_metadata"] = dict(metadata)
230230
except TypeError:
231231
raise TypeError("Metadata must be coercible to a dict")
232-
if timeout:
232+
if timeout is not None:
233233
try:
234-
extra["tx_timeout"] = int(1000 * timeout)
234+
extra["tx_timeout"] = int(1000 * float(timeout))
235235
except TypeError:
236236
raise TypeError("Timeout must be specified as a number of seconds")
237+
if extra["tx_timeout"] < 0:
238+
raise ValueError("Timeout must be a positive number or 0.")
237239
fields = (query, parameters, extra)
238240
log.debug("[#%04X] C: RUN %s", self.local_port, " ".join(map(repr, fields)))
239241
if query.upper() == u"COMMIT":
@@ -281,11 +283,13 @@ def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None,
281283
extra["tx_metadata"] = dict(metadata)
282284
except TypeError:
283285
raise TypeError("Metadata must be coercible to a dict")
284-
if timeout:
286+
if timeout is not None:
285287
try:
286-
extra["tx_timeout"] = int(1000 * timeout)
288+
extra["tx_timeout"] = int(1000 * float(timeout))
287289
except TypeError:
288290
raise TypeError("Timeout must be specified as a number of seconds")
291+
if extra["tx_timeout"] < 0:
292+
raise ValueError("Timeout must be a positive number or 0.")
289293
log.debug("[#%04X] C: BEGIN %r", self.local_port, extra)
290294
self._append(b"\x11", (extra,), Response(self, "begin", **handlers))
291295

neo4j/_sync/io/_bolt4.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -181,11 +181,13 @@ def run(self, query, parameters=None, mode=None, bookmarks=None,
181181
extra["tx_metadata"] = dict(metadata)
182182
except TypeError:
183183
raise TypeError("Metadata must be coercible to a dict")
184-
if timeout:
184+
if timeout is not None:
185185
try:
186-
extra["tx_timeout"] = int(1000 * timeout)
186+
extra["tx_timeout"] = int(1000 * float(timeout))
187187
except TypeError:
188188
raise TypeError("Timeout must be specified as a number of seconds")
189+
if extra["tx_timeout"] < 0:
190+
raise ValueError("Timeout must be a positive number or 0.")
189191
fields = (query, parameters, extra)
190192
log.debug("[#%04X] C: RUN %s", self.local_port, " ".join(map(repr, fields)))
191193
if query.upper() == u"COMMIT":
@@ -232,11 +234,13 @@ def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None,
232234
extra["tx_metadata"] = dict(metadata)
233235
except TypeError:
234236
raise TypeError("Metadata must be coercible to a dict")
235-
if timeout:
237+
if timeout is not None:
236238
try:
237-
extra["tx_timeout"] = int(1000 * timeout)
239+
extra["tx_timeout"] = int(1000 * float(timeout))
238240
except TypeError:
239241
raise TypeError("Timeout must be specified as a number of seconds")
242+
if extra["tx_timeout"] < 0:
243+
raise ValueError("Timeout must be a positive number or 0.")
240244
log.debug("[#%04X] C: BEGIN %r", self.local_port, extra)
241245
self._append(b"\x11", (extra,), Response(self, "begin", **handlers))
242246

@@ -492,12 +496,13 @@ def run(self, query, parameters=None, mode=None, bookmarks=None,
492496
extra["tx_metadata"] = dict(metadata)
493497
except TypeError:
494498
raise TypeError("Metadata must be coercible to a dict")
495-
if timeout:
499+
if timeout is not None:
496500
try:
497-
extra["tx_timeout"] = int(1000 * timeout)
501+
extra["tx_timeout"] = int(1000 * float(timeout))
498502
except TypeError:
499-
raise TypeError("Timeout must be specified as a number of "
500-
"seconds")
503+
raise TypeError("Timeout must be specified as a number of seconds")
504+
if extra["tx_timeout"] < 0:
505+
raise ValueError("Timeout must be a positive number or 0.")
501506
fields = (query, parameters, extra)
502507
log.debug("[#%04X] C: RUN %s", self.local_port,
503508
" ".join(map(repr, fields)))
@@ -527,11 +532,12 @@ def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None,
527532
extra["tx_metadata"] = dict(metadata)
528533
except TypeError:
529534
raise TypeError("Metadata must be coercible to a dict")
530-
if timeout:
535+
if timeout is not None:
531536
try:
532-
extra["tx_timeout"] = int(1000 * timeout)
537+
extra["tx_timeout"] = int(1000 * float(timeout))
533538
except TypeError:
534-
raise TypeError("Timeout must be specified as a number of "
535-
"seconds")
539+
raise TypeError("Timeout must be specified as a number of seconds")
540+
if extra["tx_timeout"] < 0:
541+
raise ValueError("Timeout must be a positive number or 0.")
536542
log.debug("[#%04X] C: BEGIN %r", self.local_port, extra)
537543
self._append(b"\x11", (extra,), Response(self, "begin", **handlers))

neo4j/work/query.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class Query:
2424
:param metadata: metadata attached to the query.
2525
:type metadata: dict
2626
:param timeout: seconds.
27-
:type timeout: int
27+
:type timeout: float or None
2828
"""
2929
def __init__(self, text, metadata=None, timeout=None):
3030
self.text = text
@@ -59,8 +59,10 @@ def count_people_tx(tx):
5959
Transactions that execute longer than the configured timeout will be terminated by the database.
6060
This functionality allows to limit query/transaction execution time.
6161
Specified timeout overrides the default timeout configured in the database using ``dbms.transaction.timeout`` setting.
62-
Value should not represent a duration of zero or negative duration.
63-
:type timeout: int
62+
Value should not represent a negative duration.
63+
A zero duration will make the transaction execute indefinitely.
64+
None will use the default timeout configured in the database.
65+
:type timeout: float or None
6466
"""
6567

6668
def wrapper(f):

testkitbackend/_async/backend.py

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
dumps,
2626
loads,
2727
)
28+
from pathlib import Path
2829
import traceback
2930

3031
from neo4j._exceptions import BoltError
@@ -42,6 +43,10 @@
4243
from ..backend import Request
4344

4445

46+
TESTKIT_BACKEND_PATH = Path(__file__).absolute().resolve().parents[1]
47+
DRIVER_PATH = TESTKIT_BACKEND_PATH.parent / "neo4j"
48+
49+
4550
class AsyncBackend:
4651
def __init__(self, rd, wr):
4752
self._rd = rd
@@ -81,6 +86,30 @@ async def process_request(self):
8186
request = request + line
8287
return False
8388

89+
@staticmethod
90+
def _exc_stems_from_driver(exc):
91+
stack = traceback.extract_tb(exc.__traceback__)
92+
for frame in stack[-1:1:-1]:
93+
p = Path(frame.filename)
94+
if TESTKIT_BACKEND_PATH in p.parents:
95+
return False
96+
if DRIVER_PATH in p.parents:
97+
return True
98+
99+
async def _handle_driver_exc(self, exc):
100+
log.debug(traceback.format_exc())
101+
if isinstance(exc, Neo4jError):
102+
msg = "" if exc.message is None else str(exc.message)
103+
else:
104+
msg = str(exc.args[0]) if exc.args else ""
105+
106+
key = self.next_key()
107+
self.errors[key] = exc
108+
payload = {"id": key, "errorType": str(type(exc)), "msg": msg}
109+
if isinstance(exc, Neo4jError):
110+
payload["code"] = exc.code
111+
await self.send_response("DriverError", payload)
112+
84113
async def _process(self, request):
85114
""" Process a received request by retrieving handler that
86115
corresponds to the request name.
@@ -104,24 +133,16 @@ async def _process(self, request):
104133
)
105134
except (Neo4jError, DriverError, UnsupportedServerProduct,
106135
BoltError) as e:
107-
log.debug(traceback.format_exc())
108-
if isinstance(e, Neo4jError):
109-
msg = "" if e.message is None else str(e.message)
110-
else:
111-
msg = str(e.args[0]) if e.args else ""
112-
113-
key = self.next_key()
114-
self.errors[key] = e
115-
payload = {"id": key, "errorType": str(type(e)), "msg": msg}
116-
if isinstance(e, Neo4jError):
117-
payload["code"] = e.code
118-
await self.send_response("DriverError", payload)
136+
await self._handle_driver_exc(e)
119137
except requests.FrontendError as e:
120138
await self.send_response("FrontendError", {"msg": str(e)})
121-
except Exception:
122-
tb = traceback.format_exc()
123-
log.error(tb)
124-
await self.send_response("BackendError", {"msg": tb})
139+
except Exception as e:
140+
if self._exc_stems_from_driver(e):
141+
await self._handle_driver_exc(e)
142+
else:
143+
tb = traceback.format_exc()
144+
log.error(tb)
145+
await self.send_response("BackendError", {"msg": tb})
125146

126147
async def send_response(self, name, data):
127148
""" Sends a response to backend.

testkitbackend/_async/requests.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -253,8 +253,8 @@ async def SessionClose(backend, data):
253253
async def SessionBeginTransaction(backend, data):
254254
key = data["sessionId"]
255255
session = backend.sessions[key].session
256-
metadata, timeout = fromtestkit.to_meta_and_timeout(data)
257-
tx = await session.begin_transaction(metadata=metadata, timeout=timeout)
256+
tx_kwargs = fromtestkit.to_tx_kwargs(data)
257+
tx = await session.begin_transaction(**tx_kwargs)
258258
key = backend.next_key()
259259
backend.transactions[key] = tx
260260
await backend.send_response("Transaction", {"id": key})
@@ -272,9 +272,9 @@ async def transactionFunc(backend, data, is_read):
272272
key = data["sessionId"]
273273
session_tracker = backend.sessions[key]
274274
session = session_tracker.session
275-
metadata, timeout = fromtestkit.to_meta_and_timeout(data)
275+
tx_kwargs = fromtestkit.to_tx_kwargs(data)
276276

277-
@neo4j.unit_of_work(metadata=metadata, timeout=timeout)
277+
@neo4j.unit_of_work(**tx_kwargs)
278278
async def func(tx):
279279
txkey = backend.next_key()
280280
backend.transactions[txkey] = tx

0 commit comments

Comments
 (0)