in redis/cluster.py [0:0]
def _execute_command(self, target_node, *args, **kwargs):
"""
Send a command to a node in the cluster
"""
command = args[0]
redis_node = None
connection = None
redirect_addr = None
asking = False
moved = False
ttl = int(self.RedisClusterRequestTTL)
while ttl > 0:
ttl -= 1
try:
if asking:
target_node = self.get_node(node_name=redirect_addr)
elif moved:
# MOVED occurred and the slots cache was updated,
# refresh the target node
slot = self.determine_slot(*args)
target_node = self.nodes_manager.get_node_from_slot(
slot,
self.read_from_replicas and command in READ_COMMANDS,
self.load_balancing_strategy
if command in READ_COMMANDS
else None,
)
moved = False
redis_node = self.get_redis_connection(target_node)
connection = get_connection(redis_node)
if asking:
connection.send_command("ASKING")
redis_node.parse_response(connection, "ASKING", **kwargs)
asking = False
connection.send_command(*args, **kwargs)
response = redis_node.parse_response(connection, command, **kwargs)
# Remove keys entry, it needs only for cache.
kwargs.pop("keys", None)
if command in self.cluster_response_callbacks:
response = self.cluster_response_callbacks[command](
response, **kwargs
)
return response
except AuthenticationError:
raise
except (ConnectionError, TimeoutError) as e:
# Connection retries are being handled in the node's
# Retry object.
# ConnectionError can also be raised if we couldn't get a
# connection from the pool before timing out, so check that
# this is an actual connection before attempting to disconnect.
if connection is not None:
connection.disconnect()
# Remove the failed node from the startup nodes before we try
# to reinitialize the cluster
self.nodes_manager.startup_nodes.pop(target_node.name, None)
# Reset the cluster node's connection
target_node.redis_connection = None
self.nodes_manager.initialize()
raise e
except MovedError as e:
# First, we will try to patch the slots/nodes cache with the
# redirected node output and try again. If MovedError exceeds
# 'reinitialize_steps' number of times, we will force
# reinitializing the tables, and then try again.
# 'reinitialize_steps' counter will increase faster when
# the same client object is shared between multiple threads. To
# reduce the frequency you can set this variable in the
# RedisCluster constructor.
self.reinitialize_counter += 1
if self._should_reinitialized():
self.nodes_manager.initialize()
# Reset the counter
self.reinitialize_counter = 0
else:
self.nodes_manager.update_moved_exception(e)
moved = True
except TryAgainError:
if ttl < self.RedisClusterRequestTTL / 2:
time.sleep(0.05)
except AskError as e:
redirect_addr = get_node_name(host=e.host, port=e.port)
asking = True
except ClusterDownError as e:
# ClusterDownError can occur during a failover and to get
# self-healed, we will try to reinitialize the cluster layout
# and retry executing the command
time.sleep(0.25)
self.nodes_manager.initialize()
raise e
except ResponseError:
raise
except Exception as e:
if connection:
connection.disconnect()
raise e
finally:
if connection is not None:
redis_node.connection_pool.release(connection)
raise ClusterError("TTL exhausted.")