diff --git a/docs/source/api.rst b/docs/source/api.rst index 4cfb1512..67972ba0 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -430,6 +430,11 @@ it should be chosen larger than :ref:`connection-timeout-ref`. :Type: ``float`` :Default: ``60.0`` +.. versionadded:: 6.0 + The setting now entails *anything* required to acquire a connection. + This includes potential fetching of routing tables which in itself requires acquiring a connection. + Previously, the timeout wold be restarted for such auxiliary connection acquisitions. + .. _connection-timeout-ref: diff --git a/src/neo4j/_async/io/__init__.py b/src/neo4j/_async/io/__init__.py index 7c950834..f7c926f0 100644 --- a/src/neo4j/_async/io/__init__.py +++ b/src/neo4j/_async/io/__init__.py @@ -28,6 +28,7 @@ "AsyncBoltPool", "AsyncNeo4jPool", "ConnectionErrorHandler", + "acquisition_timeout_to_deadline", ] @@ -40,6 +41,7 @@ from ._bolt import AsyncBolt from ._common import ConnectionErrorHandler from ._pool import ( + acquisition_timeout_to_deadline, AcquisitionAuth, AcquisitionDatabase, AsyncBoltPool, diff --git a/src/neo4j/_async/io/_pool.py b/src/neo4j/_async/io/_pool.py index 58eeafda..935e979f 100644 --- a/src/neo4j/_async/io/_pool.py +++ b/src/neo4j/_async/io/_pool.py @@ -260,6 +260,8 @@ async def connection_creator(): with self.lock: self.connections_reservations[address] -= 1 + if deadline.expired(): + return None max_pool_size = self.pool_config.max_connection_pool_size infinite_pool_size = max_pool_size < 0 or max_pool_size == float("inf") with self.lock: @@ -969,7 +971,9 @@ async def update_routing_table( :raise neo4j.exceptions.ServiceUnavailable: """ - _check_acquisition_timeout(acquisition_timeout) + acquisition_timeout = acquisition_timeout_to_deadline( + acquisition_timeout + ) async with self.refresh_lock: routing_table = await self.get_routing_table(database) if routing_table is not None: @@ -1147,7 +1151,7 @@ async def acquire( database_callback=None, ): access_mode = check_access_mode(access_mode) - _check_acquisition_timeout(timeout) + timeout = acquisition_timeout_to_deadline(timeout) target_database = database.name @@ -1242,6 +1246,13 @@ async def on_write_failure(self, address, database): log.debug("[#0000] _: table=%r", self.routing_tables) +def acquisition_timeout_to_deadline(timeout: object) -> Deadline: + if isinstance(timeout, Deadline): + return timeout + _check_acquisition_timeout(timeout) + return Deadline(timeout) + + def _check_acquisition_timeout(timeout: object) -> None: if not isinstance(timeout, (int, float)): raise TypeError( diff --git a/src/neo4j/_async/work/workspace.py b/src/neo4j/_async/work/workspace.py index dae2fbbc..75960d22 100644 --- a/src/neo4j/_async/work/workspace.py +++ b/src/neo4j/_async/work/workspace.py @@ -30,12 +30,14 @@ ) from .._debug import AsyncNonConcurrentMethodChecker from ..io import ( + acquisition_timeout_to_deadline, AcquisitionAuth, AcquisitionDatabase, ) if t.TYPE_CHECKING: + from ..._deadline import Deadline from ...api import _TAuth from ...auth_management import ( AsyncAuthManager, @@ -159,13 +161,19 @@ async def _connect(self, access_mode, auth=None, **acquire_kwargs) -> None: await self._connection.fetch_all() await self._disconnect() + acquisition_deadline = acquisition_timeout_to_deadline( + acquisition_timeout + ) + ssr_enabled = self._pool.ssr_enabled target_db = await self._get_routing_target_database( - acquire_auth, ssr_enabled=ssr_enabled + acquire_auth, + ssr_enabled=ssr_enabled, + acquisition_deadline=acquisition_deadline, ) acquire_kwargs_ = { "access_mode": access_mode, - "timeout": acquisition_timeout, + "timeout": acquisition_deadline, "database": target_db, "bookmarks": await self._get_bookmarks(), "auth": acquire_auth, @@ -188,7 +196,9 @@ async def _connect(self, access_mode, auth=None, **acquire_kwargs) -> None: ) await self._disconnect() target_db = await self._get_routing_target_database( - acquire_auth, ssr_enabled=False + acquire_auth, + ssr_enabled=False, + acquisition_deadline=acquisition_deadline, ) acquire_kwargs_["database"] = target_db self._connection = await self._pool.acquire(**acquire_kwargs_) @@ -198,6 +208,7 @@ async def _get_routing_target_database( self, acquire_auth: AcquisitionAuth, ssr_enabled: bool, + acquisition_deadline: Deadline, ) -> AcquisitionDatabase: if ( self._pinned_database @@ -232,14 +243,13 @@ async def _get_routing_target_database( ) return AcquisitionDatabase(cached_db, guessed=True) - acquisition_timeout = self._config.connection_acquisition_timeout log.debug("[#0000] _: resolve home database") await self._pool.update_routing_table( database=self._config.database, imp_user=self._config.impersonated_user, bookmarks=await self._get_bookmarks(), auth=acquire_auth, - acquisition_timeout=acquisition_timeout, + acquisition_timeout=acquisition_deadline, database_callback=self._make_db_resolution_callback(), ) return AcquisitionDatabase(self._config.database) diff --git a/src/neo4j/_sync/io/__init__.py b/src/neo4j/_sync/io/__init__.py index 775fd504..097890e6 100644 --- a/src/neo4j/_sync/io/__init__.py +++ b/src/neo4j/_sync/io/__init__.py @@ -28,6 +28,7 @@ "BoltPool", "Neo4jPool", "ConnectionErrorHandler", + "acquisition_timeout_to_deadline", ] @@ -40,6 +41,7 @@ from ._bolt import Bolt from ._common import ConnectionErrorHandler from ._pool import ( + acquisition_timeout_to_deadline, AcquisitionAuth, AcquisitionDatabase, BoltPool, diff --git a/src/neo4j/_sync/io/_pool.py b/src/neo4j/_sync/io/_pool.py index 1819ad08..8deb06c6 100644 --- a/src/neo4j/_sync/io/_pool.py +++ b/src/neo4j/_sync/io/_pool.py @@ -257,6 +257,8 @@ def connection_creator(): with self.lock: self.connections_reservations[address] -= 1 + if deadline.expired(): + return None max_pool_size = self.pool_config.max_connection_pool_size infinite_pool_size = max_pool_size < 0 or max_pool_size == float("inf") with self.lock: @@ -966,7 +968,9 @@ def update_routing_table( :raise neo4j.exceptions.ServiceUnavailable: """ - _check_acquisition_timeout(acquisition_timeout) + acquisition_timeout = acquisition_timeout_to_deadline( + acquisition_timeout + ) with self.refresh_lock: routing_table = self.get_routing_table(database) if routing_table is not None: @@ -1144,7 +1148,7 @@ def acquire( database_callback=None, ): access_mode = check_access_mode(access_mode) - _check_acquisition_timeout(timeout) + timeout = acquisition_timeout_to_deadline(timeout) target_database = database.name @@ -1239,6 +1243,13 @@ def on_write_failure(self, address, database): log.debug("[#0000] _: table=%r", self.routing_tables) +def acquisition_timeout_to_deadline(timeout: object) -> Deadline: + if isinstance(timeout, Deadline): + return timeout + _check_acquisition_timeout(timeout) + return Deadline(timeout) + + def _check_acquisition_timeout(timeout: object) -> None: if not isinstance(timeout, (int, float)): raise TypeError( diff --git a/src/neo4j/_sync/work/workspace.py b/src/neo4j/_sync/work/workspace.py index 1be5a744..a13b53c7 100644 --- a/src/neo4j/_sync/work/workspace.py +++ b/src/neo4j/_sync/work/workspace.py @@ -30,12 +30,14 @@ ) from .._debug import NonConcurrentMethodChecker from ..io import ( + acquisition_timeout_to_deadline, AcquisitionAuth, AcquisitionDatabase, ) if t.TYPE_CHECKING: + from ..._deadline import Deadline from ...api import _TAuth from ...auth_management import AuthManager from ..home_db_cache import ( @@ -156,13 +158,19 @@ def _connect(self, access_mode, auth=None, **acquire_kwargs) -> None: self._connection.fetch_all() self._disconnect() + acquisition_deadline = acquisition_timeout_to_deadline( + acquisition_timeout + ) + ssr_enabled = self._pool.ssr_enabled target_db = self._get_routing_target_database( - acquire_auth, ssr_enabled=ssr_enabled + acquire_auth, + ssr_enabled=ssr_enabled, + acquisition_deadline=acquisition_deadline, ) acquire_kwargs_ = { "access_mode": access_mode, - "timeout": acquisition_timeout, + "timeout": acquisition_deadline, "database": target_db, "bookmarks": self._get_bookmarks(), "auth": acquire_auth, @@ -185,7 +193,9 @@ def _connect(self, access_mode, auth=None, **acquire_kwargs) -> None: ) self._disconnect() target_db = self._get_routing_target_database( - acquire_auth, ssr_enabled=False + acquire_auth, + ssr_enabled=False, + acquisition_deadline=acquisition_deadline, ) acquire_kwargs_["database"] = target_db self._connection = self._pool.acquire(**acquire_kwargs_) @@ -195,6 +205,7 @@ def _get_routing_target_database( self, acquire_auth: AcquisitionAuth, ssr_enabled: bool, + acquisition_deadline: Deadline, ) -> AcquisitionDatabase: if ( self._pinned_database @@ -229,14 +240,13 @@ def _get_routing_target_database( ) return AcquisitionDatabase(cached_db, guessed=True) - acquisition_timeout = self._config.connection_acquisition_timeout log.debug("[#0000] _: resolve home database") self._pool.update_routing_table( database=self._config.database, imp_user=self._config.impersonated_user, bookmarks=self._get_bookmarks(), auth=acquire_auth, - acquisition_timeout=acquisition_timeout, + acquisition_timeout=acquisition_deadline, database_callback=self._make_db_resolution_callback(), ) return AcquisitionDatabase(self._config.database) diff --git a/testkitbackend/test_config.json b/testkitbackend/test_config.json index 0c784bc7..3777f4e2 100644 --- a/testkitbackend/test_config.json +++ b/testkitbackend/test_config.json @@ -15,13 +15,7 @@ "'neo4j.datatypes.test_temporal_types.TestDataTypes.test_date_time_cypher_created_tz_id'": "test_subtest_skips.tz_id", "stub\\.routing\\.test_routing_v[0-9x]+\\.RoutingV[0-9x]+\\.test_should_drop_connections_failing_liveness_check": - "Liveness check error handling is not (yet) unified: https://github.com/neo-technology/drivers-adr/pull/83", - "'stub.homedb.test_homedb.TestHomeDbMixedCluster.test_connection_acquisition_timeout_during_fallback'": - "TODO: 6.0 - pending unification: connection acquisition timeout should count towards the total time spent waiting for a connection (including routing, home db resolution, ...)", - "'stub.driver_parameters.test_connection_acquisition_timeout_ms.TestConnectionAcquisitionTimeoutMs.test_does_encompass_router_route_response'": - "TODO: 6.0 - pending unification: connection acquisition timeout should count towards the total time spent waiting for a connection (including routing, home db resolution, ...)", - "'stub.driver_parameters.test_connection_acquisition_timeout_ms.TestConnectionAcquisitionTimeoutMs.test_router_handshake_shares_acquisition_timeout'": - "TODO: 6.0 - pending unification: connection acquisition timeout should count towards the total time spent waiting for a connection (including routing, home db resolution, ...)" + "Liveness check error handling is not (yet) unified: https://github.com/neo-technology/drivers-adr/pull/83" }, "features": { "Feature:API:BookmarkManager": true,