python/rocketmq/v5/util/message_id_codec.py (60 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import threading from datetime import datetime, timezone from os import getpid from time import time from uuid import getnode from rocketmq.v5.util.atomic import AtomicInteger # # The codec for the message-id. # # <p>Codec here provides the following two functions: # 1. Provide decoding function of message-id of all versions above v0. # 2. Provide a generator of message-id of v1 version. # # <p>The message-id of versions above V1 consists of 17 bytes in total. The first two bytes represent the version # number. For V1, these two bytes are 0x0001. # # <h3>V1 message id example</h3> # # <pre> # ┌──┬────────────┬────┬────────┬────────┐ # │01│56F7E71C361B│21BC│024CCDBE│00000000│ # └──┴────────────┴────┴────────┴────────┘ # </pre> # # <h3>V1 version message id generation rules</h3> # # <pre> # process id(lower 2bytes) # ▲ # mac address(lower 6bytes) │ sequence number(big endian) # ▲ │ ▲ (4bytes) # │ │ │ # ┌─────┴─────┐ ┌┴┐ ┌───┐ ┌─┴─┐ # 0x01+ │ 6 │ │2│ │ 4 │ │ 4 │ # └───────────┘ └─┘ └─┬─┘ └───┘ # │ # ▼ # seconds since 2021-01-01 00:00:00(UTC+0) # (lower 4bytes) # </pre> # class MessageIdCodec: MESSAGE_ID_LENGTH_FOR_V1_OR_LATER = 34 MESSAGE_ID_VERSION_V0 = "00" MESSAGE_ID_VERSION_V1 = "01" _instance_lock = threading.Lock() __index = AtomicInteger() def __new__(cls, *args, **kwargs): with MessageIdCodec._instance_lock: if not hasattr(MessageIdCodec, "_instance"): MessageIdCodec._instance = object.__new__(cls) return MessageIdCodec._instance def __init__(self): with MessageIdCodec._instance_lock: if not hasattr(self, "initialized"): buffer = bytearray(8) mac = getnode().to_bytes(6, byteorder="big") buffer[0:6] = mac pid = getpid() pid_buffer = bytearray(4) pid_buffer[0:4] = pid.to_bytes(4, byteorder="big") buffer[6:8] = pid_buffer[2:4] self.process_fixed_string_v1 = buffer.hex().upper() self.seconds_since_custom_epoch = int( ( datetime.now(timezone.utc) - datetime(2021, 1, 1, 0, 0, 0, tzinfo=timezone.utc) ).total_seconds() ) self.seconds_start_timestamp = int(time()) self.seconds = self.__delta_time() self.sequence = None self.initialized = True def next_message_id(self): delta_seconds = self.__delta_time() if self.seconds != delta_seconds: self.seconds = delta_seconds buffer = bytearray(8) buffer[0:4] = self.seconds.to_bytes(8, byteorder="big")[4:8] buffer[4:8] = self.__sequence_id().to_bytes(4, byteorder="big") return ( MessageIdCodec.MESSAGE_ID_VERSION_V1 + self.process_fixed_string_v1 + buffer.hex().upper() ) @staticmethod def decode(message_id): return message_id """ private """ def __delta_time(self): return ( int(time()) - self.seconds_start_timestamp + self.seconds_since_custom_epoch ) def __sequence_id(self): self.sequence = MessageIdCodec.__index.get_and_increment() return self.sequence