From 878e51986ea94645bd041abf06a1f6243aa67383 Mon Sep 17 00:00:00 2001 From: GuyAv46 Date: Mon, 17 Nov 2025 08:38:50 +0200 Subject: [PATCH 1/4] refactor to use cluster slots to ensure agreement --- RLTest/redis_cluster.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/RLTest/redis_cluster.py b/RLTest/redis_cluster.py index 8c412bb..44375f9 100644 --- a/RLTest/redis_cluster.py +++ b/RLTest/redis_cluster.py @@ -48,7 +48,7 @@ def getInformationBeforeDispose(self): return [shard.getInformationBeforeDispose() for shard in self.shards] def getInformationAfterDispose(self): - return [shard.getInformationAfterDispose() for shard in self.shards] + return [shard.getInformationAfterDispose() for shard in self.shards] def waitCluster(self, timeout_sec=40): st = time.time() @@ -56,14 +56,17 @@ def waitCluster(self, timeout_sec=40): while st + timeout_sec > time.time(): ok = 0 + first_view = None for shard in self.shards: con = shard.getConnection() try: - status = con.execute_command('CLUSTER', 'INFO') + slots_pov = con.execute_command('CLUSTER', 'SLOTS') except Exception as e: print('got error on cluster info, will try again, %s' % str(e)) continue - if 'cluster_state:ok' in str(status): + if first_view is None: + first_view = slots_pov + if slots_pov == first_view: ok += 1 if ok == len(self.shards): for shard in self.shards: From 858359c0584ff583d00463d4171321a791aaef06 Mon Sep 17 00:00:00 2001 From: GuyAv46 Date: Mon, 17 Nov 2025 08:38:58 +0200 Subject: [PATCH 2/4] expose to Env --- RLTest/env.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/RLTest/env.py b/RLTest/env.py index 925fa62..9083209 100644 --- a/RLTest/env.py +++ b/RLTest/env.py @@ -397,6 +397,10 @@ def getClusterConnectionIfNeeded(self): else: return self.getConnection() + def waitCluster(self, timeout_sec=40): + if isinstance(self.envRunner, ClusterEnv) or isinstance(self.envRunner, EnterpriseRedisClusterEnv): + self.envRunner.waitCluster(timeout_sec) + def addShardToClusterIfExists(self): if isinstance(self.envRunner, ClusterEnv): test_fname = self.testName.replace(':', '_') From 97767f6d741ff9a17f9028a5abf3df1209932839 Mon Sep 17 00:00:00 2001 From: GuyAv46 Date: Mon, 17 Nov 2025 09:18:16 +0200 Subject: [PATCH 3/4] wait for both OK and same topo --- RLTest/redis_cluster.py | 45 +++++++++++++++++++++++++++-------------- 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/RLTest/redis_cluster.py b/RLTest/redis_cluster.py index 44375f9..c2b1e78 100644 --- a/RLTest/redis_cluster.py +++ b/RLTest/redis_cluster.py @@ -50,25 +50,40 @@ def getInformationBeforeDispose(self): def getInformationAfterDispose(self): return [shard.getInformationAfterDispose() for shard in self.shards] + def _agreeOk(self): + ok = 0 + for shard in self.shards: + con = shard.getConnection() + try: + status = con.execute_command('CLUSTER', 'INFO') + except Exception as e: + print('got error on cluster slots, will try again, %s' % str(e)) + continue + if 'cluster_state:ok' in str(status): + ok += 1 + return ok == len(self.shards) + + def _agreeSlots(self): + ok = 0 + first_view = None + for shard in self.shards: + con = shard.getConnection() + try: + slots_view = con.execute_command('CLUSTER', 'SLOTS') + except Exception as e: + print('got error on cluster slots, will try again, %s' % str(e)) + continue + if first_view is None: + first_view = slots_view + if slots_view == first_view: + ok += 1 + return ok == len(self.shards) + def waitCluster(self, timeout_sec=40): st = time.time() - ok = 0 while st + timeout_sec > time.time(): - ok = 0 - first_view = None - for shard in self.shards: - con = shard.getConnection() - try: - slots_pov = con.execute_command('CLUSTER', 'SLOTS') - except Exception as e: - print('got error on cluster info, will try again, %s' % str(e)) - continue - if first_view is None: - first_view = slots_pov - if slots_pov == first_view: - ok += 1 - if ok == len(self.shards): + if self._agreeOk() and self._agreeSlots(): for shard in self.shards: try: shard.getConnection().execute_command('SEARCH.CLUSTERREFRESH') From ae0b47e0621bb0f474f73ecb299e3118f0772651 Mon Sep 17 00:00:00 2001 From: GuyAv46 Date: Mon, 17 Nov 2025 09:28:46 +0200 Subject: [PATCH 4/4] minor improvements --- RLTest/env.py | 2 +- RLTest/redis_cluster.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/RLTest/env.py b/RLTest/env.py index 9083209..b488e85 100644 --- a/RLTest/env.py +++ b/RLTest/env.py @@ -398,7 +398,7 @@ def getClusterConnectionIfNeeded(self): return self.getConnection() def waitCluster(self, timeout_sec=40): - if isinstance(self.envRunner, ClusterEnv) or isinstance(self.envRunner, EnterpriseRedisClusterEnv): + if isinstance(self.envRunner, (ClusterEnv, EnterpriseRedisClusterEnv)): self.envRunner.waitCluster(timeout_sec) def addShardToClusterIfExists(self): diff --git a/RLTest/redis_cluster.py b/RLTest/redis_cluster.py index c2b1e78..8b70313 100644 --- a/RLTest/redis_cluster.py +++ b/RLTest/redis_cluster.py @@ -57,7 +57,7 @@ def _agreeOk(self): try: status = con.execute_command('CLUSTER', 'INFO') except Exception as e: - print('got error on cluster slots, will try again, %s' % str(e)) + print('got error on cluster info, will try again, %s' % str(e)) continue if 'cluster_state:ok' in str(status): ok += 1