in redis/cluster.py [0:0]
def execute_command(self, *args):
"""
Execute a subscribe/unsubscribe command.
Taken code from redis-py and tweak to make it work within a cluster.
"""
# NOTE: don't parse the response in this function -- it could pull a
# legitimate message off the stack if the connection is already
# subscribed to one or more channels
if self.connection is None:
if self.connection_pool is None:
if len(args) > 1:
# Hash the first channel and get one of the nodes holding
# this slot
channel = args[1]
slot = self.cluster.keyslot(channel)
node = self.cluster.nodes_manager.get_node_from_slot(
slot,
self.cluster.read_from_replicas,
self.cluster.load_balancing_strategy,
)
else:
# Get a random node
node = self.cluster.get_random_node()
self.node = node
redis_connection = self.cluster.get_redis_connection(node)
self.connection_pool = redis_connection.connection_pool
self.connection = self.connection_pool.get_connection()
# register a callback that re-subscribes to any channels we
# were listening to when we were disconnected
self.connection.register_connect_callback(self.on_connect)
if self.push_handler_func is not None and not HIREDIS_AVAILABLE:
self.connection._parser.set_pubsub_push_handler(self.push_handler_func)
self._event_dispatcher.dispatch(
AfterPubSubConnectionInstantiationEvent(
self.connection, self.connection_pool, ClientType.SYNC, self._lock
)
)
connection = self.connection
self._execute(connection, connection.send_command, *args)