Skip to content

Unify connection_acquisition_timeout behavior #1215

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: 6.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,11 @@ it should be chosen larger than :ref:`connection-timeout-ref`.
:Type: ``float``
:Default: ``60.0``

.. versionadded:: 6.0
The setting now entails *anything* required to acquire a connection.
This includes potential fetching of routing tables which in itself requires acquiring a connection.
Previously, the timeout wold be restarted for such auxiliary connection acquisitions.


.. _connection-timeout-ref:

Expand Down
2 changes: 2 additions & 0 deletions src/neo4j/_async/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"AsyncBoltPool",
"AsyncNeo4jPool",
"ConnectionErrorHandler",
"acquisition_timeout_to_deadline",
]


Expand All @@ -40,6 +41,7 @@
from ._bolt import AsyncBolt
from ._common import ConnectionErrorHandler
from ._pool import (
acquisition_timeout_to_deadline,
AcquisitionAuth,
AcquisitionDatabase,
AsyncBoltPool,
Expand Down
15 changes: 13 additions & 2 deletions src/neo4j/_async/io/_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ async def connection_creator():
with self.lock:
self.connections_reservations[address] -= 1

if deadline.expired():
return None
max_pool_size = self.pool_config.max_connection_pool_size
infinite_pool_size = max_pool_size < 0 or max_pool_size == float("inf")
with self.lock:
Expand Down Expand Up @@ -969,7 +971,9 @@ async def update_routing_table(

:raise neo4j.exceptions.ServiceUnavailable:
"""
_check_acquisition_timeout(acquisition_timeout)
acquisition_timeout = acquisition_timeout_to_deadline(
acquisition_timeout
)
async with self.refresh_lock:
routing_table = await self.get_routing_table(database)
if routing_table is not None:
Expand Down Expand Up @@ -1147,7 +1151,7 @@ async def acquire(
database_callback=None,
):
access_mode = check_access_mode(access_mode)
_check_acquisition_timeout(timeout)
timeout = acquisition_timeout_to_deadline(timeout)

target_database = database.name

Expand Down Expand Up @@ -1242,6 +1246,13 @@ async def on_write_failure(self, address, database):
log.debug("[#0000] _: <POOL> table=%r", self.routing_tables)


def acquisition_timeout_to_deadline(timeout: object) -> Deadline:
if isinstance(timeout, Deadline):
return timeout
_check_acquisition_timeout(timeout)
return Deadline(timeout)


def _check_acquisition_timeout(timeout: object) -> None:
if not isinstance(timeout, (int, float)):
raise TypeError(
Expand Down
20 changes: 15 additions & 5 deletions src/neo4j/_async/work/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@
)
from .._debug import AsyncNonConcurrentMethodChecker
from ..io import (
acquisition_timeout_to_deadline,
AcquisitionAuth,
AcquisitionDatabase,
)


if t.TYPE_CHECKING:
from ..._deadline import Deadline
from ...api import _TAuth
from ...auth_management import (
AsyncAuthManager,
Expand Down Expand Up @@ -159,13 +161,19 @@ async def _connect(self, access_mode, auth=None, **acquire_kwargs) -> None:
await self._connection.fetch_all()
await self._disconnect()

acquisition_deadline = acquisition_timeout_to_deadline(
acquisition_timeout
)

ssr_enabled = self._pool.ssr_enabled
target_db = await self._get_routing_target_database(
acquire_auth, ssr_enabled=ssr_enabled
acquire_auth,
ssr_enabled=ssr_enabled,
acquisition_deadline=acquisition_deadline,
)
acquire_kwargs_ = {
"access_mode": access_mode,
"timeout": acquisition_timeout,
"timeout": acquisition_deadline,
"database": target_db,
"bookmarks": await self._get_bookmarks(),
"auth": acquire_auth,
Expand All @@ -188,7 +196,9 @@ async def _connect(self, access_mode, auth=None, **acquire_kwargs) -> None:
)
await self._disconnect()
target_db = await self._get_routing_target_database(
acquire_auth, ssr_enabled=False
acquire_auth,
ssr_enabled=False,
acquisition_deadline=acquisition_deadline,
)
acquire_kwargs_["database"] = target_db
self._connection = await self._pool.acquire(**acquire_kwargs_)
Expand All @@ -198,6 +208,7 @@ async def _get_routing_target_database(
self,
acquire_auth: AcquisitionAuth,
ssr_enabled: bool,
acquisition_deadline: Deadline,
) -> AcquisitionDatabase:
if (
self._pinned_database
Expand Down Expand Up @@ -232,14 +243,13 @@ async def _get_routing_target_database(
)
return AcquisitionDatabase(cached_db, guessed=True)

acquisition_timeout = self._config.connection_acquisition_timeout
log.debug("[#0000] _: <WORKSPACE> resolve home database")
await self._pool.update_routing_table(
database=self._config.database,
imp_user=self._config.impersonated_user,
bookmarks=await self._get_bookmarks(),
auth=acquire_auth,
acquisition_timeout=acquisition_timeout,
acquisition_timeout=acquisition_deadline,
database_callback=self._make_db_resolution_callback(),
)
return AcquisitionDatabase(self._config.database)
Expand Down
2 changes: 2 additions & 0 deletions src/neo4j/_sync/io/__init__.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 13 additions & 2 deletions src/neo4j/_sync/io/_pool.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 15 additions & 5 deletions src/neo4j/_sync/work/workspace.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 1 addition & 7 deletions testkitbackend/test_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,7 @@
"'neo4j.datatypes.test_temporal_types.TestDataTypes.test_date_time_cypher_created_tz_id'":
"test_subtest_skips.tz_id",
"stub\\.routing\\.test_routing_v[0-9x]+\\.RoutingV[0-9x]+\\.test_should_drop_connections_failing_liveness_check":
"Liveness check error handling is not (yet) unified: https://github.com/neo-technology/drivers-adr/pull/83",
"'stub.homedb.test_homedb.TestHomeDbMixedCluster.test_connection_acquisition_timeout_during_fallback'":
"TODO: 6.0 - pending unification: connection acquisition timeout should count towards the total time spent waiting for a connection (including routing, home db resolution, ...)",
"'stub.driver_parameters.test_connection_acquisition_timeout_ms.TestConnectionAcquisitionTimeoutMs.test_does_encompass_router_route_response'":
"TODO: 6.0 - pending unification: connection acquisition timeout should count towards the total time spent waiting for a connection (including routing, home db resolution, ...)",
"'stub.driver_parameters.test_connection_acquisition_timeout_ms.TestConnectionAcquisitionTimeoutMs.test_router_handshake_shares_acquisition_timeout'":
"TODO: 6.0 - pending unification: connection acquisition timeout should count towards the total time spent waiting for a connection (including routing, home db resolution, ...)"
"Liveness check error handling is not (yet) unified: https://github.com/neo-technology/drivers-adr/pull/83"
},
"features": {
"Feature:API:BookmarkManager": true,
Expand Down