+ def __create_redis_clients(self, config):
+ clients = list()
+ cfg_params = config.get_params()
+ if cfg_params.db_cluster_addr_list is None:
+ clients.append(self.__create_legacy_redis_client(cfg_params))
+ else:
+ for addr in cfg_params.db_cluster_addr_list.split(","):
+ client = self.__create_redis_client(cfg_params, addr)
+ clients.append(client)
+ return clients
+
+ def __create_legacy_redis_client(self, cfg_params):
+ return self.__create_redis_client(cfg_params, cfg_params.db_host)
+
+ def __create_redis_client(self, cfg_params, addr):
+ new_sentinel = None
+ new_redis = None
+ if cfg_params.db_sentinel_port is None:
+ new_redis = Redis(host=addr, port=cfg_params.db_port, db=0, max_connections=20)
+ else:
+ sentinel_node = (addr, cfg_params.db_sentinel_port)
+ master_name = cfg_params.db_sentinel_master_name
+ new_sentinel = Sentinel([sentinel_node])
+ new_redis = new_sentinel.master_for(master_name)
+
+ new_redis.set_response_callback('SETIE', lambda r: r and nativestr(r) == 'OK' or False)
+ new_redis.set_response_callback('DELIE', lambda r: r and int(r) == 1 or False)
+
+ redis_pubsub = PubSub(self.event_separator, new_redis.connection_pool, ignore_subscribe_messages=True)
+ pubsub_thread = threading.Thread(target=None)
+ run_in_thread = False
+
+ return _RedisConn(new_redis, redis_pubsub, pubsub_thread, run_in_thread)
+
+ def __getClientConns(self):
+ return self.clients
+
+ def __getClientConn(self, ns):
+ clients_cnt = len(self.clients)
+ client_id = self.__get_hash(ns) % clients_cnt
+ return self.clients[client_id]
+
+ def __getClient(self, ns):
+ clients_cnt = len(self.clients)
+ client_id = 0
+ if clients_cnt > 1:
+ client_id = self.__get_hash(ns) % clients_cnt
+ return self.clients[client_id].redis_client
+
+ @classmethod
+ def __get_hash(cls, str):
+ return zlib.crc32(str.encode())
+