python/rocketmq/v5/model/message.py (203 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from rocketmq.grpc_protocol import DigestType, Encoding, definition_pb2 from rocketmq.v5.exception import IllegalArgumentException from rocketmq.v5.util import MessageIdCodec, Misc class Message: def __init__(self): self.__body = None self.__topic = None self.__namespace = None self.__message_id = None self.__tag = None self.__message_group = None self.__delivery_timestamp = None self.__keys = set() self.__properties = dict() self.__born_host = None self.__born_timestamp = None self.__delivery_attempt = None self.__receipt_handle = None self.__message_type = None self.__endpoints = None def __str__(self) -> str: return ( f"topic:{self.__topic}, tag:{self.__tag}, messageGroup:{self.__message_group}, " f"deliveryTimestamp:{self.__delivery_timestamp}, keys:{self.__keys}, properties:{self.__properties}" ) def fromProtobuf(self, message: definition_pb2.Message): # noqa try: self.__message_body_check_sum(message) self.__topic = message.topic.name self.__namespace = message.topic.resource_namespace self.__message_id = MessageIdCodec.decode( message.system_properties.message_id ) self.__body = self.__uncompress_body(message) self.__tag = message.system_properties.tag self.__message_group = message.system_properties.message_group self.__born_host = message.system_properties.born_host self.__born_timestamp = message.system_properties.born_timestamp.seconds self.__delivery_attempt = message.system_properties.delivery_attempt self.__delivery_timestamp = message.system_properties.delivery_timestamp self.__receipt_handle = message.system_properties.receipt_handle self.__message_type = message.system_properties.message_type if message.system_properties.keys is not None: self.__keys.update(message.system_properties.keys) if message.user_properties is not None: self.__properties.update(message.user_properties) return self except Exception as e: raise e """ private """ @staticmethod def __message_body_check_sum(message): if message.system_properties.body_digest.type == DigestType.CRC32: crc32_sum = Misc.crc32_checksum(message.body) if message.system_properties.body_digest.checksum != crc32_sum: raise Exception( f"(body_check_sum exception, {message.digest.checksum} != crc32_sum {crc32_sum}" ) elif message.system_properties.body_digest.type == DigestType.MD5: md5_sum = Misc.md5_checksum(message.body) if message.system_properties.body_digest.checksum != md5_sum: raise Exception( f"(body_check_sum exception, {message.digest.checksum} != crc32_sum {md5_sum}" ) elif message.system_properties.body_digest.type == DigestType.SHA1: sha1_sum = Misc.sha1_checksum(message.body) if message.system_properties.body_digest.checksum != sha1_sum: raise Exception( f"(body_check_sum exception, {message.digest.checksum} != crc32_sum {sha1_sum}" ) else: raise Exception( f"unsupported message body digest algorithm, {message.system_properties.body_digest.type}," f" {message.topic}, {message.system_properties.message_id}" ) @staticmethod def __uncompress_body(message): if message.system_properties.body_encoding == Encoding.GZIP: return Misc.uncompress_bytes_gzip(message.body) elif message.system_properties.body_encoding == Encoding.IDENTITY: return message.body else: raise Exception( f"unsupported message encoding algorithm, {message.system_properties.body_encoding}, {message.topic}, {message.system_properties.message_id}" ) """ property """ @property def body(self): return self.__body @property def topic(self): return self.__topic @property def namespace(self): return self.__namespace @property def message_id(self): return self.__message_id @property def tag(self): return self.__tag @property def message_group(self): return self.__message_group @property def delivery_timestamp(self): return self.__delivery_timestamp @property def keys(self): return self.__keys @property def properties(self): return self.__properties @property def born_host(self): return self.__born_host @property def born_timestamp(self): return self.__born_timestamp @property def delivery_attempt(self): return self.__delivery_attempt @property def receipt_handle(self): return self.__receipt_handle @property def message_type(self): return self.__message_type @property def endpoints(self): return self.__endpoints @body.setter def body(self, body): if body is None or body.strip() == "": raise IllegalArgumentException("body should not be blank") self.__body = body @topic.setter def topic(self, topic): if topic is None or topic.strip() == "": raise IllegalArgumentException("topic has not been set yet") if Misc.is_valid_topic(topic): self.__topic = topic else: raise IllegalArgumentException( f"topic does not match the regex [regex={Misc.TOPIC_PATTERN}]" ) @message_id.setter def message_id(self, message_id): self.__message_id = message_id @tag.setter def tag(self, tag): if tag is None or tag.strip() == "": raise IllegalArgumentException("tag should not be blank") if "|" in tag: raise IllegalArgumentException('tag should not contain "|"') self.__tag = tag @message_group.setter def message_group(self, message_group): if self.__delivery_timestamp is not None: raise IllegalArgumentException( "deliveryTimestamp and messageGroup should not be set at same time" ) if message_group is None or len(message_group) == 0: raise IllegalArgumentException("messageGroup should not be blank") self.__message_group = message_group @delivery_timestamp.setter def delivery_timestamp(self, delivery_timestamp): if self.__message_group is not None: raise IllegalArgumentException( "deliveryTimestamp and messageGroup should not be set at same time" ) self.__delivery_timestamp = delivery_timestamp @keys.setter def keys(self, *keys): for key in keys: if not key or key.strip() == "": raise IllegalArgumentException("key should not be blank") self.__keys.update(set(keys)) @receipt_handle.setter def receipt_handle(self, receipt_handle): self.__receipt_handle = receipt_handle @message_type.setter def message_type(self, message_type): self.__message_type = message_type @endpoints.setter def endpoints(self, endpoints): self.__endpoints = endpoints def add_property(self, key, value): if key is None or key.strip() == "": raise IllegalArgumentException("key should not be blank") if value is None or value.strip() == "": raise IllegalArgumentException("value should not be blank") self.__properties[key] = value @staticmethod def message_type_desc(message_type): if message_type == 1: return "NORMAL" elif message_type == 2: return "FIFO" elif message_type == 3: return "DELAY" elif message_type == 4: return "TRANSACTION" else: return "MESSAGE_TYPE_UNSPECIFIED"