def _execute_command()

in redis/cluster.py [0:0]


    def _execute_command(self, target_node, *args, **kwargs):
        """
        Send a command to a node in the cluster
        """
        command = args[0]
        redis_node = None
        connection = None
        redirect_addr = None
        asking = False
        moved = False
        ttl = int(self.RedisClusterRequestTTL)

        while ttl > 0:
            ttl -= 1
            try:
                if asking:
                    target_node = self.get_node(node_name=redirect_addr)
                elif moved:
                    # MOVED occurred and the slots cache was updated,
                    # refresh the target node
                    slot = self.determine_slot(*args)
                    target_node = self.nodes_manager.get_node_from_slot(
                        slot,
                        self.read_from_replicas and command in READ_COMMANDS,
                        self.load_balancing_strategy
                        if command in READ_COMMANDS
                        else None,
                    )
                    moved = False

                redis_node = self.get_redis_connection(target_node)
                connection = get_connection(redis_node)
                if asking:
                    connection.send_command("ASKING")
                    redis_node.parse_response(connection, "ASKING", **kwargs)
                    asking = False
                connection.send_command(*args, **kwargs)
                response = redis_node.parse_response(connection, command, **kwargs)

                # Remove keys entry, it needs only for cache.
                kwargs.pop("keys", None)

                if command in self.cluster_response_callbacks:
                    response = self.cluster_response_callbacks[command](
                        response, **kwargs
                    )
                return response
            except AuthenticationError:
                raise
            except (ConnectionError, TimeoutError) as e:
                # Connection retries are being handled in the node's
                # Retry object.
                # ConnectionError can also be raised if we couldn't get a
                # connection from the pool before timing out, so check that
                # this is an actual connection before attempting to disconnect.
                if connection is not None:
                    connection.disconnect()

                # Remove the failed node from the startup nodes before we try
                # to reinitialize the cluster
                self.nodes_manager.startup_nodes.pop(target_node.name, None)
                # Reset the cluster node's connection
                target_node.redis_connection = None
                self.nodes_manager.initialize()
                raise e
            except MovedError as e:
                # First, we will try to patch the slots/nodes cache with the
                # redirected node output and try again. If MovedError exceeds
                # 'reinitialize_steps' number of times, we will force
                # reinitializing the tables, and then try again.
                # 'reinitialize_steps' counter will increase faster when
                # the same client object is shared between multiple threads. To
                # reduce the frequency you can set this variable in the
                # RedisCluster constructor.
                self.reinitialize_counter += 1
                if self._should_reinitialized():
                    self.nodes_manager.initialize()
                    # Reset the counter
                    self.reinitialize_counter = 0
                else:
                    self.nodes_manager.update_moved_exception(e)
                moved = True
            except TryAgainError:
                if ttl < self.RedisClusterRequestTTL / 2:
                    time.sleep(0.05)
            except AskError as e:
                redirect_addr = get_node_name(host=e.host, port=e.port)
                asking = True
            except ClusterDownError as e:
                # ClusterDownError can occur during a failover and to get
                # self-healed, we will try to reinitialize the cluster layout
                # and retry executing the command
                time.sleep(0.25)
                self.nodes_manager.initialize()
                raise e
            except ResponseError:
                raise
            except Exception as e:
                if connection:
                    connection.disconnect()
                raise e
            finally:
                if connection is not None:
                    redis_node.connection_pool.release(connection)

        raise ClusterError("TTL exhausted.")