in redis/cluster.py [0:0]
def initialize(self):
"""
Initializes the nodes cache, slots cache and redis connections.
:startup_nodes:
Responsible for discovering other nodes in the cluster
"""
self.reset()
tmp_nodes_cache = {}
tmp_slots = {}
disagreements = []
startup_nodes_reachable = False
fully_covered = False
kwargs = self.connection_kwargs
exception = None
for startup_node in self.startup_nodes.values():
try:
if startup_node.redis_connection:
r = startup_node.redis_connection
else:
# Create a new Redis connection
r = self.create_redis_node(
startup_node.host, startup_node.port, **kwargs
)
self.startup_nodes[startup_node.name].redis_connection = r
# Make sure cluster mode is enabled on this node
try:
cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS"))
r.connection_pool.disconnect()
except ResponseError:
raise RedisClusterException(
"Cluster mode is not enabled on this node"
)
startup_nodes_reachable = True
except Exception as e:
# Try the next startup node.
# The exception is saved and raised only if we have no more nodes.
exception = e
continue
# CLUSTER SLOTS command results in the following output:
# [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]]
# where each node contains the following list: [IP, port, node_id]
# Therefore, cluster_slots[0][2][0] will be the IP address of the
# primary node of the first slot section.
# If there's only one server in the cluster, its ``host`` is ''
# Fix it to the host in startup_nodes
if (
len(cluster_slots) == 1
and len(cluster_slots[0][2][0]) == 0
and len(self.startup_nodes) == 1
):
cluster_slots[0][2][0] = startup_node.host
for slot in cluster_slots:
primary_node = slot[2]
host = str_if_bytes(primary_node[0])
if host == "":
host = startup_node.host
port = int(primary_node[1])
host, port = self.remap_host_port(host, port)
nodes_for_slot = []
target_node = self._get_or_create_cluster_node(
host, port, PRIMARY, tmp_nodes_cache
)
nodes_for_slot.append(target_node)
replica_nodes = slot[3:]
for replica_node in replica_nodes:
host = str_if_bytes(replica_node[0])
port = int(replica_node[1])
host, port = self.remap_host_port(host, port)
target_replica_node = self._get_or_create_cluster_node(
host, port, REPLICA, tmp_nodes_cache
)
nodes_for_slot.append(target_replica_node)
for i in range(int(slot[0]), int(slot[1]) + 1):
if i not in tmp_slots:
tmp_slots[i] = nodes_for_slot
else:
# Validate that 2 nodes want to use the same slot cache
# setup
tmp_slot = tmp_slots[i][0]
if tmp_slot.name != target_node.name:
disagreements.append(
f"{tmp_slot.name} vs {target_node.name} on slot: {i}"
)
if len(disagreements) > 5:
raise RedisClusterException(
f"startup_nodes could not agree on a valid "
f"slots cache: {', '.join(disagreements)}"
)
fully_covered = self.check_slots_coverage(tmp_slots)
if fully_covered:
# Don't need to continue to the next startup node if all
# slots are covered
break
if not startup_nodes_reachable:
raise RedisClusterException(
f"Redis Cluster cannot be connected. Please provide at least "
f"one reachable node: {str(exception)}"
) from exception
if self._cache is None and self._cache_config is not None:
if self._cache_factory is None:
self._cache = CacheFactory(self._cache_config).get_cache()
else:
self._cache = self._cache_factory.get_cache()
# Create Redis connections to all nodes
self.create_redis_connections(list(tmp_nodes_cache.values()))
# Check if the slots are not fully covered
if not fully_covered and self._require_full_coverage:
# Despite the requirement that the slots be covered, there
# isn't a full coverage
raise RedisClusterException(
f"All slots are not covered after query all startup_nodes. "
f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} "
f"covered..."
)
# Set the tmp variables to the real variables
self.nodes_cache = tmp_nodes_cache
self.slots_cache = tmp_slots
# Set the default node
self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]
if self._dynamic_startup_nodes:
# Populate the startup nodes with all discovered nodes
self.startup_nodes = tmp_nodes_cache
# If initialize was called after a MovedError, clear it
self._moved_exception = None