From 6f3fae77288cd7562b63a05eef433f2cee011d8f Mon Sep 17 00:00:00 2001 From: Petya Slavova Date: Thu, 6 Mar 2025 14:26:19 +0200 Subject: [PATCH] Fixing async cluster pipeline execution when client is created with cluster_error_retry_attempts=0 --- redis/asyncio/cluster.py | 23 +++++++++++------------ tests/test_asyncio/test_cluster.py | 13 ++++++++++++- 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index f343e26b75..bd78042c37 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -1509,29 +1509,28 @@ async def execute( return [] try: - for _ in range(self._client.cluster_error_retry_attempts): - if self._client._initialize: - await self._client.initialize() - + retry_attempts = self._client.cluster_error_retry_attempts + while True: try: + if self._client._initialize: + await self._client.initialize() return await self._execute( self._client, self._command_stack, raise_on_error=raise_on_error, allow_redirections=allow_redirections, ) - except BaseException as e: - if type(e) in self.__class__.ERRORS_ALLOW_RETRY: - # Try again with the new cluster setup. - exception = e + + except self.__class__.ERRORS_ALLOW_RETRY as e: + if retry_attempts > 0: + # Try again with the new cluster setup. All other errors + # should be raised. + retry_attempts -= 1 await self._client.aclose() await asyncio.sleep(0.25) else: # All other errors should be raised. - raise - - # If it fails the configured number of times then raise an exception - raise exception + raise e finally: self._command_stack = [] diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index 9911eced27..4fbfcf62ce 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -2673,6 +2673,17 @@ async def test_redis_cluster_pipeline(self, r: RedisCluster) -> None: ) assert result == [True, b"1", 1, {b"F": b"V"}, True, True, b"2", b"3", 1, 1, 1] + async def test_cluster_pipeline_execution_zero_cluster_err_retries( + self, r: RedisCluster + ) -> None: + """ + Test that we can run successfully cluster pipeline execute at least once when + cluster_error_retry_attempts is set to 0 + """ + r.cluster_error_retry_attempts = 0 + result = await r.pipeline().set("A", 1).get("A").delete("A").execute() + assert result == [True, b"1", 1] + async def test_multi_key_operation_with_a_single_slot( self, r: RedisCluster ) -> None: @@ -2733,7 +2744,7 @@ async def parse_response( await pipe.get(key).execute() assert ( node.parse_response.await_count - == 3 * r.cluster_error_retry_attempts - 2 + == 3 * r.cluster_error_retry_attempts + 1 ) async def test_connection_error_not_raised(self, r: RedisCluster) -> None: