From 665c34b767bf48a7fce4248f26d16e4962fb35f3 Mon Sep 17 00:00:00 2001 From: Rouven Bauer Date: Wed, 24 Mar 2021 12:02:29 +0100 Subject: [PATCH 1/8] Improve error handling in io package Don't throw OSError on failed write, but BoltIncompleteCommitError or ServiceUnavailable. Since Python 3.3 IOError is an alias for OSError => canonicalize code. --- neo4j/io/__init__.py | 58 ++++++++++------------- neo4j/io/_bolt3.py | 106 ++++++++++++++++++++----------------------- neo4j/io/_bolt4.py | 102 ++++++++++++++++++++--------------------- neo4j/io/_common.py | 2 +- 4 files changed, 125 insertions(+), 143 deletions(-) diff --git a/neo4j/io/__init__.py b/neo4j/io/__init__.py index 51c3db4a9..4280a005c 100644 --- a/neo4j/io/__init__.py +++ b/neo4j/io/__init__.py @@ -38,60 +38,52 @@ from logging import getLogger from random import choice from select import select -from time import perf_counter - from socket import ( + AF_INET, + AF_INET6, + SHUT_RDWR, + SO_KEEPALIVE, socket, SOL_SOCKET, - SO_KEEPALIVE, - SHUT_RDWR, timeout as SocketTimeout, - AF_INET, - AF_INET6, ) - from ssl import ( HAS_SNI, SSLError, ) - -from struct import ( - pack as struct_pack, -) - from threading import ( + Condition, Lock, RLock, - Condition, ) +from time import perf_counter -from neo4j.addressing import Address -from neo4j.conf import PoolConfig from neo4j._exceptions import ( + BoltHandshakeError, + BoltProtocolError, BoltRoutingError, BoltSecurityError, - BoltProtocolError, - BoltHandshakeError, ) -from neo4j.exceptions import ( - ServiceUnavailable, - ClientError, - SessionExpired, - ReadServiceUnavailable, - WriteServiceUnavailable, - ConfigurationError, - UnsupportedServerProduct, +from neo4j.addressing import Address +from neo4j.api import ( + READ_ACCESS, + Version, + WRITE_ACCESS, ) -from neo4j.routing import RoutingTable from neo4j.conf import ( PoolConfig, WorkspaceConfig, ) -from neo4j.api import ( - READ_ACCESS, - WRITE_ACCESS, - Version, +from neo4j.exceptions import ( + ClientError, + ConfigurationError, + ReadServiceUnavailable, + ServiceUnavailable, + SessionExpired, + UnsupportedServerProduct, + WriteServiceUnavailable, ) +from neo4j.routing import RoutingTable # Set up logger log = getLogger("neo4j") @@ -262,7 +254,7 @@ def open(cls, address, *, auth=None, timeout=None, routing_context=None, **pool_ log.debug("[#%04X] C: %s", s.getsockname()[1], str(error)) s.shutdown(SHUT_RDWR) s.close() - raise error + raise return connection @@ -522,7 +514,7 @@ def deactivate(self, address): connections.remove(conn) try: conn.close() - except IOError: + except OSError: pass if not connections: self.remove(address) @@ -538,7 +530,7 @@ def remove(self, address): for connection in self.connections.pop(address, ()): try: connection.close() - except IOError: + except OSError: pass def close(self): diff --git a/neo4j/io/_bolt3.py b/neo4j/io/_bolt3.py index e81c6aea9..bb0257f56 100644 --- a/neo4j/io/_bolt3.py +++ b/neo4j/io/_bolt3.py @@ -19,47 +19,49 @@ # limitations under the License. from collections import deque +from logging import getLogger from ssl import SSLSocket from time import perf_counter + +from neo4j._exceptions import ( + BoltError, + BoltIncompleteCommitError, + BoltProtocolError, +) +from neo4j.addressing import Address +from neo4j.api import ServerInfo from neo4j.api import ( - Version, READ_ACCESS, + Version, ) -from neo4j.io._common import ( - Inbox, - Outbox, - Response, - InitResponse, - CommitResponse, -) -from neo4j.meta import get_user_agent from neo4j.exceptions import ( AuthError, - ServiceUnavailable, + ConfigurationError, DatabaseUnavailable, - NotALeader, + DriverError, ForbiddenOnReadOnlyDatabase, + NotALeader, + ServiceUnavailable, SessionExpired, - ConfigurationError, - UnsupportedServerProduct, -) -from neo4j._exceptions import ( - BoltIncompleteCommitError, - BoltProtocolError, -) -from neo4j.packstream import ( - Unpacker, - Packer, ) from neo4j.io import ( + check_supported_server_product, Bolt, BoltPool, - check_supported_server_product, ) -from neo4j.api import ServerInfo -from neo4j.addressing import Address +from neo4j.io._common import ( + CommitResponse, + Inbox, + InitResponse, + Outbox, + Response, +) +from neo4j.meta import get_user_agent +from neo4j.packstream import ( + Packer, + Unpacker, +) -from logging import getLogger log = getLogger("neo4j") @@ -88,7 +90,7 @@ def __init__(self, unresolved_address, sock, max_connection_lifetime, *, auth=No self.socket = sock self.server_info = ServerInfo(Address(sock.getpeername()), self.PROTOCOL_VERSION) self.outbox = Outbox() - self.inbox = Inbox(self.socket, on_error=self._set_defunct) + self.inbox = Inbox(self.socket, on_error=self._set_defunct_read) self.packer = Packer(self.outbox) self.unpacker = Unpacker(self.inbox) self.responses = deque() @@ -138,7 +140,7 @@ def der_encoded_server_certificate(self): def local_port(self): try: return self.socket.getsockname()[1] - except IOError: + except OSError: return 0 def get_base_headers(self): @@ -292,7 +294,10 @@ def fail(metadata): def _send_all(self): data = self.outbox.view() if data: - self.socket.sendall(data) + try: + self.socket.sendall(data) + except OSError as error: + self._set_defunct_write(error) self.outbox.clear() def send_all(self): @@ -306,17 +311,7 @@ def send_all(self): raise ServiceUnavailable("Failed to write to defunct connection {!r} ({!r})".format( self.unresolved_address, self.server_info.address)) - try: - self._send_all() - except (IOError, OSError) as error: - log.error("Failed to write data to connection " - "{!r} ({!r}); ({!r})". - format(self.unresolved_address, - self.server_info.address, - "; ".join(map(repr, error.args)))) - if self.pool: - self.pool.deactivate(address=self.unresolved_address) - raise + self._send_all() def fetch_message(self): """ Receive at least one message from the server, if available. @@ -336,17 +331,7 @@ def fetch_message(self): return 0, 0 # Receive exactly one message - try: - details, summary_signature, summary_metadata = next(self.inbox) - except (IOError, OSError) as error: - log.error("Failed to read data from connection " - "{!r} ({!r}); ({!r})". - format(self.unresolved_address, - self.server_info.address, - "; ".join(map(repr, error.args)))) - if self.pool: - self.pool.deactivate(address=self.unresolved_address) - raise + details, summary_signature, summary_metadata = next(self.inbox) if details: log.debug("[#%04X] S: RECORD * %d", self.local_port, len(details)) # Do not log any data @@ -380,11 +365,20 @@ def fetch_message(self): return len(details), 1 - def _set_defunct(self, error=None): - direct_driver = isinstance(self.pool, BoltPool) + def _set_defunct_read(self, error=None): + message = "Failed to read from defunct connection {!r} ({!r})".format( + self.unresolved_address, self.server_info.address + ) + self._set_defunct(message, error=error) - message = ("Failed to read from defunct connection {!r} ({!r})".format( - self.unresolved_address, self.server_info.address)) + def _set_defunct_write(self, error=None): + message = "Failed to write data to connection {!r} ({!r})".format( + self.unresolved_address, self.server_info.address + ) + self._set_defunct(message, error=error) + + def _set_defunct(self, message, error=None): + direct_driver = isinstance(self.pool, BoltPool) if error: log.error(str(error)) @@ -445,12 +439,12 @@ def close(self): self._append(b"\x02", ()) try: self._send_all() - except: + except (OSError, BoltError, DriverError): pass log.debug("[#%04X] C: ", self.local_port) try: self.socket.close() - except IOError: + except OSError: pass finally: self._closed = True diff --git a/neo4j/io/_bolt4.py b/neo4j/io/_bolt4.py index 4968332c3..b1faa23d3 100644 --- a/neo4j/io/_bolt4.py +++ b/neo4j/io/_bolt4.py @@ -19,46 +19,50 @@ # limitations under the License. from collections import deque +from logging import getLogger from ssl import SSLSocket from time import perf_counter + +from neo4j._exceptions import ( + BoltError, + BoltIncompleteCommitError, + BoltProtocolError, +) +from neo4j.addressing import Address from neo4j.api import ( - Version, READ_ACCESS, SYSTEM_DATABASE, + Version, ) -from neo4j.io._common import ( - Inbox, - Outbox, - Response, - InitResponse, - CommitResponse, -) -from neo4j.meta import get_user_agent +from neo4j.api import ServerInfo from neo4j.exceptions import ( AuthError, - ServiceUnavailable, DatabaseUnavailable, - NotALeader, + DriverError, ForbiddenOnReadOnlyDatabase, + NotALeader, + ServiceUnavailable, SessionExpired, ) -from neo4j._exceptions import ( - BoltIncompleteCommitError, - BoltProtocolError, -) -from neo4j.packstream import ( - Unpacker, - Packer, -) from neo4j.io import ( Bolt, BoltPool, check_supported_server_product, ) -from neo4j.api import ServerInfo -from neo4j.addressing import Address +from neo4j.io._common import ( + CommitResponse, + Inbox, + InitResponse, + Outbox, + Response, +) +from neo4j.meta import get_user_agent +from neo4j.packstream import ( + Unpacker, + Packer, +) + -from logging import getLogger log = getLogger("neo4j") @@ -87,7 +91,7 @@ def __init__(self, unresolved_address, sock, max_connection_lifetime, *, auth=No self.socket = sock self.server_info = ServerInfo(Address(sock.getpeername()), self.PROTOCOL_VERSION) self.outbox = Outbox() - self.inbox = Inbox(self.socket, on_error=self._set_defunct) + self.inbox = Inbox(self.socket, on_error=self._set_defunct_read) self.packer = Packer(self.outbox) self.unpacker = Unpacker(self.inbox) self.responses = deque() @@ -137,7 +141,7 @@ def der_encoded_server_certificate(self): def local_port(self): try: return self.socket.getsockname()[1] - except IOError: + except OSError: return 0 def get_base_headers(self): @@ -300,7 +304,10 @@ def fail(metadata): def _send_all(self): data = self.outbox.view() if data: - self.socket.sendall(data) + try: + self.socket.sendall(data) + except OSError as error: + self._set_defunct_write(error) self.outbox.clear() def send_all(self): @@ -314,17 +321,7 @@ def send_all(self): raise ServiceUnavailable("Failed to write to defunct connection {!r} ({!r})".format( self.unresolved_address, self.server_info.address)) - try: - self._send_all() - except (IOError, OSError) as error: - log.error("Failed to write data to connection " - "{!r} ({!r}); ({!r})". - format(self.unresolved_address, - self.server_info.address, - "; ".join(map(repr, error.args)))) - if self.pool: - self.pool.deactivate(address=self.unresolved_address) - raise + self._send_all() def fetch_message(self): """ Receive at least one message from the server, if available. @@ -344,17 +341,7 @@ def fetch_message(self): return 0, 0 # Receive exactly one message - try: - details, summary_signature, summary_metadata = next(self.inbox) - except (IOError, OSError) as error: - log.error("Failed to read data from connection " - "{!r} ({!r}); ({!r})". - format(self.unresolved_address, - self.server_info.address, - "; ".join(map(repr, error.args)))) - if self.pool: - self.pool.deactivate(address=self.unresolved_address) - raise + details, summary_signature, summary_metadata = next(self.inbox) if details: log.debug("[#%04X] S: RECORD * %d", self.local_port, len(details)) # Do not log any data @@ -389,11 +376,20 @@ def fetch_message(self): return len(details), 1 - def _set_defunct(self, error=None): - direct_driver = isinstance(self.pool, BoltPool) + def _set_defunct_read(self, error=None): + message = "Failed to read from defunct connection {!r} ({!r})".format( + self.unresolved_address, self.server_info.address + ) + self._set_defunct(message, error=error) + + def _set_defunct_write(self, error=None): + message = "Failed to write data to connection {!r} ({!r})".format( + self.unresolved_address, self.server_info.address + ) + self._set_defunct(message, error=error) - message = ("Failed to read from defunct connection {!r} ({!r})".format( - self.unresolved_address, self.server_info.address)) + def _set_defunct(self, message, error=None): + direct_driver = isinstance(self.pool, BoltPool) if error: log.error(str(error)) @@ -454,12 +450,12 @@ def close(self): self._append(b"\x02", ()) try: self._send_all() - except: + except (OSError, BoltError, DriverError): pass log.debug("[#%04X] C: ", self.local_port) try: self.socket.close() - except IOError: + except OSError: pass finally: self._closed = True diff --git a/neo4j/io/_common.py b/neo4j/io/_common.py index e7dd36536..213b900f0 100644 --- a/neo4j/io/_common.py +++ b/neo4j/io/_common.py @@ -22,8 +22,8 @@ from struct import pack as struct_pack from neo4j.exceptions import ( - Neo4jError, AuthError, + Neo4jError, ServiceUnavailable, ) from neo4j.packstream import ( From 26b04f5f061d8e045fb4447080d96be91ed34a70 Mon Sep 17 00:00:00 2001 From: Rouven Bauer Date: Wed, 24 Mar 2021 15:56:29 +0100 Subject: [PATCH 2/8] Session discards transaction on network errors --- neo4j/work/result.py | 40 ++++++++++++++++++++++++++++++++++----- neo4j/work/simple.py | 31 +++++++++++++++++++++++++----- neo4j/work/transaction.py | 25 ++++++++++++++++++------ 3 files changed, 80 insertions(+), 16 deletions(-) diff --git a/neo4j/work/result.py b/neo4j/work/result.py index cf68741e4..2ecbd7470 100644 --- a/neo4j/work/result.py +++ b/neo4j/work/result.py @@ -20,20 +20,50 @@ from collections import deque +from contextlib import contextmanager from warnings import warn +from neo4j._exceptions import BoltIncompleteCommitError from neo4j.data import DataDehydrator +from neo4j.exceptions import ( + ServiceUnavailable, + SessionExpired, +) from neo4j.work.summary import ResultSummary +class _ConnectionErrorHandler: + def __init__(self, connection, on_network_error): + self._connection = connection + self._on_network_error = on_network_error + + def __getattr__(self, item): + connection_attr = getattr(self._connection, item) + if not callable(connection_attr): + return connection_attr + + def outer(func): + def inner(*args, **kwargs): + try: + func(*args, **kwargs) + except (SessionExpired, ServiceUnavailable, + BoltIncompleteCommitError) as error: + self._on_network_error(error) + raise + return inner + + return outer(connection_attr) + + class Result: """A handler for the result of Cypher query execution. Instances of this class are typically constructed and returned by :meth:`.Session.run` and :meth:`.Transaction.run`. """ - def __init__(self, connection, hydrant, fetch_size, on_closed): - self._connection = connection + def __init__(self, connection, hydrant, fetch_size, on_closed, + on_network_error): + self._connection = _ConnectionErrorHandler(connection, on_network_error) self._hydrant = hydrant self._on_closed = on_closed self._metadata = None @@ -97,7 +127,7 @@ def on_failed_attach(metadata): on_failure=on_failed_attach, ) self._pull() - self._connection.send_all() + self._connection._send_all() self._attach() def _pull(self): @@ -187,10 +217,10 @@ def __iter__(self): yield self._record_buffer.popleft() elif self._discarding and self._streaming is False: self._discard() - self._connection.send_all() + self._connection._send_all() elif self._has_more and self._streaming is False: self._pull() - self._connection.send_all() + self._connection._send_all() self._closed = True diff --git a/neo4j/work/simple.py b/neo4j/work/simple.py index 0ee46e7b8..0d09cd683 100644 --- a/neo4j/work/simple.py +++ b/neo4j/work/simple.py @@ -130,6 +130,11 @@ def _result_closed(self): self._autoResult = None self._disconnect() + def _result_network_error(self, error): + if self._autoResult: + self._autoResult = None + self._disconnect() + def close(self): """Close the session. This will release any borrowed resources, such as connections, and will roll back any outstanding transactions. """ @@ -213,8 +218,14 @@ def run(self, query, parameters=None, **kwparameters): hydrant = DataHydrator() - self._autoResult = Result(cx, hydrant, self._config.fetch_size, self._result_closed) - self._autoResult._run(query, parameters, self._config.database, self._config.default_access_mode, self._bookmarks, **kwparameters) + self._autoResult = Result( + cx, hydrant, self._config.fetch_size, self._result_closed, + self._result_network_error + ) + self._autoResult._run( + query, parameters, self._config.database, + self._config.default_access_mode, self._bookmarks, **kwparameters + ) return self._autoResult @@ -243,10 +254,20 @@ def _transaction_closed_handler(self): self._transaction = None self._disconnect() - def _open_transaction(self, *, access_mode, database, metadata=None, timeout=None): + def _transaction_network_error_handler(self, error): + if self._transaction: + self._transaction = None + self._disconnect() + + def _open_transaction(self, *, access_mode, database, metadata=None, + timeout=None): self._connect(access_mode=access_mode, database=database) - self._transaction = Transaction(self._connection, self._config.fetch_size, self._transaction_closed_handler) - self._transaction._begin(database, self._bookmarks, access_mode, metadata, timeout) + self._transaction = Transaction( + self._connection, self._config.fetch_size, + self._transaction_closed_handler, + self._transaction_network_error_handler) + self._transaction._begin(database, self._bookmarks, access_mode, + metadata, timeout) def begin_transaction(self, metadata=None, timeout=None): """ Begin a new unmanaged transaction. Creates a new :class:`.Transaction` within this session. diff --git a/neo4j/work/transaction.py b/neo4j/work/transaction.py index 7cb7dc968..aae871a8c 100644 --- a/neo4j/work/transaction.py +++ b/neo4j/work/transaction.py @@ -39,13 +39,14 @@ class Transaction: """ - def __init__(self, connection, fetch_size, on_closed): + def __init__(self, connection, fetch_size, on_closed, on_network_error): self._connection = connection self._bookmark = None self._results = [] self._closed = False self._fetch_size = fetch_size self._on_closed = on_closed + self._on_network_error = on_network_error def __enter__(self): return self @@ -59,11 +60,16 @@ def __exit__(self, exception_type, exception_value, traceback): self.close() def _begin(self, database, bookmarks, access_mode, metadata, timeout): - self._connection.begin(bookmarks=bookmarks, metadata=metadata, timeout=timeout, mode=access_mode, db=database) + self._connection.begin(bookmarks=bookmarks, metadata=metadata, + timeout=timeout, mode=access_mode, db=database) def _result_on_closed_handler(self): pass + def _result_on_network_error_handler(self, error): + self._closed = True + self._on_network_error(error) + def _consume_results(self): for result in self._results: result.consume() @@ -108,11 +114,18 @@ def run(self, query, parameters=None, **kwparameters): if self._closed: raise TransactionError("Transaction closed") - if self._results and self._connection.supports_multiple_results is False: + if (self._results + and self._connection.supports_multiple_results is False): # Bolt 3 Support - self._results[-1]._buffer_all() # Buffer upp all records for the previous Result because it does not have any qid to fetch in batches. - - result = Result(self._connection, DataHydrator(), self._fetch_size, self._result_on_closed_handler) + # Buffer up all records for the previous Result because it does not + # have any qid to fetch in batches. + self._results[-1]._buffer_all() + + result = Result( + self._connection, DataHydrator(), self._fetch_size, + self._result_on_closed_handler, + self._result_on_network_error_handler + ) self._results.append(result) result._tx_ready_run(query, parameters, **kwparameters) From ddd79a07fdc474f5561cb9f63650151e37100daa Mon Sep 17 00:00:00 2001 From: Rouven Bauer Date: Fri, 26 Mar 2021 12:51:59 +0100 Subject: [PATCH 3/8] Workspace/Session releases connection properly --- neo4j/work/__init__.py | 2 +- neo4j/work/simple.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/neo4j/work/__init__.py b/neo4j/work/__init__.py index a516004cb..e5fd1659a 100644 --- a/neo4j/work/__init__.py +++ b/neo4j/work/__init__.py @@ -61,7 +61,7 @@ def _disconnect(self, sync): except (WorkspaceError, ServiceUnavailable): pass if self._connection: - self._connection.in_use = False + self._pool.release(self._connection) self._connection = None self._connection_access_mode = None diff --git a/neo4j/work/simple.py b/neo4j/work/simple.py index 0d09cd683..f8dc585f0 100644 --- a/neo4j/work/simple.py +++ b/neo4j/work/simple.py @@ -117,7 +117,7 @@ def _connect(self, access_mode, database): def _disconnect(self): if self._connection: - self._connection.in_use = False + self._pool.release(self._connection) self._connection = None def _collect_bookmark(self, bookmark): From abb01e2524ac1d0c69842fd29f9924d61c7ba9bf Mon Sep 17 00:00:00 2001 From: Rouven Bauer Date: Fri, 26 Mar 2021 12:53:34 +0100 Subject: [PATCH 4/8] Adjust test for session releasing connection on fail When a network error occurs, the session should release the current connection as it's dead and acquire a new one for the next transaction. --- tests/stub/test_routingdriver.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/stub/test_routingdriver.py b/tests/stub/test_routingdriver.py index 877c4db72..61c23ee30 100644 --- a/tests/stub/test_routingdriver.py +++ b/tests/stub/test_routingdriver.py @@ -617,7 +617,8 @@ def test_forgets_address_on_service_unavailable_error(driver_info, test_scripts, conns = pool.connections[('127.0.0.1', 9004)] conn = conns[0] assert conn._closed is True - assert conn.in_use is True + assert conn.in_use is False + assert session._connection is None assert table.routers == {('127.0.0.1', 9001), ('127.0.0.1', 9002), ('127.0.0.1', 9003)} # reader 127.0.0.1:9004 should've been forgotten because of an error assert not table.readers From ed8b76bfedf9ba354b6550f94d79a1d4930273fb Mon Sep 17 00:00:00 2001 From: Rouven Bauer Date: Thu, 1 Apr 2021 11:06:02 +0200 Subject: [PATCH 5/8] Code style --- neo4j/work/simple.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/neo4j/work/simple.py b/neo4j/work/simple.py index f8dc585f0..b67c4e806 100644 --- a/neo4j/work/simple.py +++ b/neo4j/work/simple.py @@ -265,7 +265,8 @@ def _open_transaction(self, *, access_mode, database, metadata=None, self._transaction = Transaction( self._connection, self._config.fetch_size, self._transaction_closed_handler, - self._transaction_network_error_handler) + self._transaction_network_error_handler + ) self._transaction._begin(database, self._bookmarks, access_mode, metadata, timeout) From 007f817153ff34409a64d92709a6ca79b895e8ec Mon Sep 17 00:00:00 2001 From: Rouven Bauer Date: Thu, 1 Apr 2021 11:06:32 +0200 Subject: [PATCH 6/8] Adding doc sting to _ConnectionErrorHandler --- neo4j/work/result.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/neo4j/work/result.py b/neo4j/work/result.py index 2ecbd7470..b3f3a9258 100644 --- a/neo4j/work/result.py +++ b/neo4j/work/result.py @@ -33,7 +33,24 @@ class _ConnectionErrorHandler: + """ + Wrapper class for handling connection errors. + + The class will wrap each method to invoke a callback if the method raises + SessionExpired, ServiceUnavailable, or BoltIncompleteCommitError. + The error will be re-raised after the callback. + """ + def __init__(self, connection, on_network_error): + """ + :param connection the connection object to warp + :type connection Bolt + :param on_network_error the function to be called when a method of + connection raises of of the caught errors. The callback takes the + error as argument. + :type on_network_error callable + + """ self._connection = connection self._on_network_error = on_network_error From bfc708684481ff5f64ff0faeedf297657c7755a3 Mon Sep 17 00:00:00 2001 From: Rouven Bauer Date: Thu, 1 Apr 2021 11:07:29 +0200 Subject: [PATCH 7/8] Revert accidental change of called method --- neo4j/work/result.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/neo4j/work/result.py b/neo4j/work/result.py index b3f3a9258..a328d8b7e 100644 --- a/neo4j/work/result.py +++ b/neo4j/work/result.py @@ -144,7 +144,7 @@ def on_failed_attach(metadata): on_failure=on_failed_attach, ) self._pull() - self._connection._send_all() + self._connection.send_all() self._attach() def _pull(self): @@ -234,10 +234,10 @@ def __iter__(self): yield self._record_buffer.popleft() elif self._discarding and self._streaming is False: self._discard() - self._connection._send_all() + self._connection.send_all() elif self._has_more and self._streaming is False: self._pull() - self._connection._send_all() + self._connection.send_all() self._closed = True From ec15a9762b7ec78923c8f9a29704e84ebbba32e9 Mon Sep 17 00:00:00 2001 From: Rouven Bauer Date: Tue, 6 Apr 2021 13:23:10 +0200 Subject: [PATCH 8/8] Fix broken error handling from merging 4.3 --- neo4j/work/result.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/neo4j/work/result.py b/neo4j/work/result.py index a328d8b7e..19d768b42 100644 --- a/neo4j/work/result.py +++ b/neo4j/work/result.py @@ -20,10 +20,8 @@ from collections import deque -from contextlib import contextmanager from warnings import warn -from neo4j._exceptions import BoltIncompleteCommitError from neo4j.data import DataDehydrator from neo4j.exceptions import ( ServiceUnavailable, @@ -37,7 +35,7 @@ class _ConnectionErrorHandler: Wrapper class for handling connection errors. The class will wrap each method to invoke a callback if the method raises - SessionExpired, ServiceUnavailable, or BoltIncompleteCommitError. + SessionExpired or ServiceUnavailable. The error will be re-raised after the callback. """ @@ -63,8 +61,7 @@ def outer(func): def inner(*args, **kwargs): try: func(*args, **kwargs) - except (SessionExpired, ServiceUnavailable, - BoltIncompleteCommitError) as error: + except (SessionExpired, ServiceUnavailable) as error: self._on_network_error(error) raise return inner