diff --git a/RLTest/env.py b/RLTest/env.py index 925fa62..b488e85 100644 --- a/RLTest/env.py +++ b/RLTest/env.py @@ -397,6 +397,10 @@ def getClusterConnectionIfNeeded(self): else: return self.getConnection() + def waitCluster(self, timeout_sec=40): + if isinstance(self.envRunner, (ClusterEnv, EnterpriseRedisClusterEnv)): + self.envRunner.waitCluster(timeout_sec) + def addShardToClusterIfExists(self): if isinstance(self.envRunner, ClusterEnv): test_fname = self.testName.replace(':', '_') diff --git a/RLTest/redis_cluster.py b/RLTest/redis_cluster.py index 8c412bb..8b70313 100644 --- a/RLTest/redis_cluster.py +++ b/RLTest/redis_cluster.py @@ -48,24 +48,42 @@ def getInformationBeforeDispose(self): return [shard.getInformationBeforeDispose() for shard in self.shards] def getInformationAfterDispose(self): - return [shard.getInformationAfterDispose() for shard in self.shards] + return [shard.getInformationAfterDispose() for shard in self.shards] + + def _agreeOk(self): + ok = 0 + for shard in self.shards: + con = shard.getConnection() + try: + status = con.execute_command('CLUSTER', 'INFO') + except Exception as e: + print('got error on cluster info, will try again, %s' % str(e)) + continue + if 'cluster_state:ok' in str(status): + ok += 1 + return ok == len(self.shards) + + def _agreeSlots(self): + ok = 0 + first_view = None + for shard in self.shards: + con = shard.getConnection() + try: + slots_view = con.execute_command('CLUSTER', 'SLOTS') + except Exception as e: + print('got error on cluster slots, will try again, %s' % str(e)) + continue + if first_view is None: + first_view = slots_view + if slots_view == first_view: + ok += 1 + return ok == len(self.shards) def waitCluster(self, timeout_sec=40): st = time.time() - ok = 0 while st + timeout_sec > time.time(): - ok = 0 - for shard in self.shards: - con = shard.getConnection() - try: - status = con.execute_command('CLUSTER', 'INFO') - except Exception as e: - print('got error on cluster info, will try again, %s' % str(e)) - continue - if 'cluster_state:ok' in str(status): - ok += 1 - if ok == len(self.shards): + if self._agreeOk() and self._agreeSlots(): for shard in self.shards: try: shard.getConnection().execute_command('SEARCH.CLUSTERREFRESH')