python/rocketmq/v5/model/topic_route.py (95 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from rocketmq.grpc_protocol import Permission, definition_pb2
from rocketmq.v5.client.connection import RpcEndpoints
from rocketmq.v5.model import Message
class MessageQueue:
MASTER_BROKER_ID = 0
def __init__(self, queue):
self.__topic = queue.topic.name
self.__namespace = queue.topic.resource_namespace
self.__queue_id = queue.id
self.__permission = queue.permission
self.__broker_name = queue.broker.name
self.__broker_id = queue.broker.id
self.__broker_endpoints = RpcEndpoints(queue.broker.endpoints)
self.__accept_message_types = set(queue.accept_message_types)
def is_readable(self):
return (
self.__permission == Permission.READ
or self.__permission == Permission.READ_WRITE
)
def is_writable(self):
return (
self.__permission == Permission.WRITE
or self.__permission == Permission.READ_WRITE
)
def is_master_broker(self):
return self.__broker_id == MessageQueue.MASTER_BROKER_ID
def __eq__(self, other: object) -> bool:
if not isinstance(other, MessageQueue):
return False
ret = (
self.__topic == other.__topic
and self.__namespace == other.__namespace
and self.__queue_id == other.__queue_id
and self.__permission == other.__permission
and self.__broker_name == other.__broker_name
and self.__broker_id == other.__broker_id
and self.__broker_endpoints == other.__broker_endpoints
and sorted(self.__accept_message_types)
== sorted(other.__accept_message_types)
)
return ret
def __str__(self):
return f"{self.__broker_name}.{self.__topic}.{self.__queue_id}"
def message_queue0(self):
# to grpc MessageQueue
queue = definition_pb2.MessageQueue() # noqa
queue.topic.name = self.__topic
queue.topic.resource_namespace = self.__namespace
queue.id = self.__queue_id
queue.permission = self.__permission
queue.broker.name = self.__broker_name
queue.broker.id = self.__broker_id
queue.broker.endpoints.CopyFrom(self.__broker_endpoints.endpoints)
queue.accept_message_types.extend(self.__accept_message_types)
return queue
def accept_message_types_desc(self):
ret = ""
for access_type in self.__accept_message_types:
ret = ret + Message.message_type_desc(access_type) + ","
if len(ret) == 0:
return ret
else:
return ret[:len(ret) - 1]
""" property """
@property
def endpoints(self) -> RpcEndpoints:
return self.__broker_endpoints
@property
def accept_message_types(self):
return self.__accept_message_types
class TopicRouteData:
def __init__(self, message_queues):
self.__message_queues = list(
map(lambda queue: MessageQueue(queue), message_queues)
)
def __eq__(self, other):
if self is other:
return True
if other is None or not isinstance(other, TopicRouteData):
return False
return self.__message_queues == other.__message_queues
def __hash__(self):
return hash(tuple(self.__message_queues))
def __str__(self):
return (
"message_queues:("
+ ", ".join(str(queue) for queue in self.__message_queues)
+ ")"
)
def all_endpoints(self):
endpoints_map = {}
for queue in self.__message_queues:
endpoints_map[queue.endpoints.facade] = queue.endpoints
return endpoints_map
""" property """
@property
def message_queues(self):
return self.__message_queues