in python/rocketmq/producer.py [0:0]
def take_message_queues(self, excluded: Set[Endpoints], count: int):
"""Fetch a specified number of message queues, excluding the ones provided.
It will first try to fetch from non-excluded brokers and if insufficient,
it will select from the excluded ones.
"""
next_index = self.get_and_increment_index()
candidates = []
candidate_broker_name = set()
queue_num = len(self.__message_queues)
for i in range(queue_num):
mq = self.__message_queues[next_index % queue_num]
next_index = next_index + 1
if (
mq.broker.endpoints not in excluded
and mq.broker.name not in candidate_broker_name
):
candidate_broker_name.add(mq.broker.name)
candidates.append(mq)
if len(candidates) >= count:
return candidates
# if all endpoints are isolated
if candidates:
return candidates
for i in range(queue_num):
mq = self.__message_queues[next_index % queue_num]
if mq.broker.name not in candidate_broker_name:
candidate_broker_name.add(mq.broker.name)
candidates.append(mq)
if len(candidates) >= count:
return candidates
return candidates