python/rocketmq/v5/client/balancer/queue_selector.py (68 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
import random
from rocketmq.v5.exception import IllegalArgumentException
from rocketmq.v5.model import TopicRouteData
from rocketmq.v5.util import AtomicInteger
class QueueSelector:
NONE_TYPE_SELECTOR = 0
PRODUCER_QUEUE_SELECTOR = 1
SIMPLE_CONSUMER_QUEUE_SELECTOR = 2
def __init__(self, queues, selector_type=NONE_TYPE_SELECTOR):
self.__index = AtomicInteger(random.randint(1, 1000))
self.__message_queues = queues
self.__selector_type = selector_type
@classmethod
def producer_queue_selector(cls, topic_route: TopicRouteData):
return cls(
list(
filter(
lambda queue: queue.is_writable() and queue.is_master_broker(),
topic_route.message_queues,
)
),
QueueSelector.PRODUCER_QUEUE_SELECTOR,
)
@classmethod
def simple_consumer_queue_selector(cls, topic_route: TopicRouteData):
return cls(
list(
filter(
lambda queue: queue.is_readable() and queue.is_master_broker(),
topic_route.message_queues,
)
),
QueueSelector.SIMPLE_CONSUMER_QUEUE_SELECTOR,
)
def select_next_queue(self):
if self.__selector_type == QueueSelector.NONE_TYPE_SELECTOR:
raise IllegalArgumentException(
"error type for queue selector, type is NONE_TYPE_SELECTOR."
)
return self.__message_queues[
self.__index.get_and_increment() % len(self.__message_queues)
]
def select_queue_by_hash_key(self, key):
hash_object = hashlib.sha256(key.encode('utf-8'))
hash_code = int.from_bytes(hash_object.digest(), byteorder='big')
print(f"hashcode: {hash_code}")
return self.__message_queues[hash_code % len(self.__message_queues)]
def all_queues(self):
index = self.__index.get_and_increment() % len(self.__message_queues)
return self.__message_queues[index:] + self.__message_queues[:index]
def update(self, topic_route: TopicRouteData):
if topic_route.message_queues == self.__message_queues:
return
if self.__selector_type == QueueSelector.PRODUCER_QUEUE_SELECTOR:
self.__message_queues = list(
filter(
lambda queue: queue.is_writable() and queue.is_master_broker(),
topic_route.message_queues,
)
)
elif self.__selector_type == QueueSelector.SIMPLE_CONSUMER_QUEUE_SELECTOR:
self.__message_queues = list(
filter(
lambda queue: queue.is_readable() and queue.is_master_broker(),
topic_route.message_queues,
)
)