in redis/cluster.py [0:0]
def _internal_execute_command(self, *args, **kwargs):
"""
Wrapper for ERRORS_ALLOW_RETRY error handling.
It will try the number of times specified by the config option
"self.cluster_error_retry_attempts" which defaults to 3 unless manually
configured.
If it reaches the number of times, the command will raise the exception
Key argument :target_nodes: can be passed with the following types:
nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM
ClusterNode
list<ClusterNode>
dict<Any, ClusterNode>
"""
target_nodes_specified = False
is_default_node = False
target_nodes = None
passed_targets = kwargs.pop("target_nodes", None)
if passed_targets is not None and not self._is_nodes_flag(passed_targets):
target_nodes = self._parse_target_nodes(passed_targets)
target_nodes_specified = True
# If an error that allows retrying was thrown, the nodes and slots
# cache were reinitialized. We will retry executing the command with
# the updated cluster setup only when the target nodes can be
# determined again with the new cache tables. Therefore, when target
# nodes were passed to this function, we cannot retry the command
# execution since the nodes may not be valid anymore after the tables
# were reinitialized. So in case of passed target nodes,
# retry_attempts will be set to 0.
retry_attempts = (
0 if target_nodes_specified else self.cluster_error_retry_attempts
)
# Add one for the first execution
execute_attempts = 1 + retry_attempts
for _ in range(execute_attempts):
try:
res = {}
if not target_nodes_specified:
# Determine the nodes to execute the command on
target_nodes = self._determine_nodes(
*args, **kwargs, nodes_flag=passed_targets
)
if not target_nodes:
raise RedisClusterException(
f"No targets were found to execute {args} command on"
)
if (
len(target_nodes) == 1
and target_nodes[0] == self.get_default_node()
):
is_default_node = True
for node in target_nodes:
res[node.name] = self._execute_command(node, *args, **kwargs)
# Return the processed result
return self._process_result(args[0], res, **kwargs)
except Exception as e:
if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY:
if is_default_node:
# Replace the default cluster node
self.replace_default_node()
# The nodes and slots cache were reinitialized.
# Try again with the new cluster setup.
retry_attempts -= 1
continue
else:
# raise the exception
raise e