python/rocketmq/v5/util/message_id_codec.py (60 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import threading
from datetime import datetime, timezone
from os import getpid
from time import time
from uuid import getnode
from rocketmq.v5.util.atomic import AtomicInteger
#
# The codec for the message-id.
#
# <p>Codec here provides the following two functions:
# 1. Provide decoding function of message-id of all versions above v0.
# 2. Provide a generator of message-id of v1 version.
#
# <p>The message-id of versions above V1 consists of 17 bytes in total. The first two bytes represent the version
# number. For V1, these two bytes are 0x0001.
#
# <h3>V1 message id example</h3>
#
# <pre>
# ┌──┬────────────┬────┬────────┬────────┐
# │01│56F7E71C361B│21BC│024CCDBE│00000000│
# └──┴────────────┴────┴────────┴────────┘
# </pre>
#
# <h3>V1 version message id generation rules</h3>
#
# <pre>
# process id(lower 2bytes)
# ▲
# mac address(lower 6bytes) │ sequence number(big endian)
# ▲ │ ▲ (4bytes)
# │ │ │
# ┌─────┴─────┐ ┌┴┐ ┌───┐ ┌─┴─┐
# 0x01+ │ 6 │ │2│ │ 4 │ │ 4 │
# └───────────┘ └─┘ └─┬─┘ └───┘
# │
# ▼
# seconds since 2021-01-01 00:00:00(UTC+0)
# (lower 4bytes)
# </pre>
#
class MessageIdCodec:
MESSAGE_ID_LENGTH_FOR_V1_OR_LATER = 34
MESSAGE_ID_VERSION_V0 = "00"
MESSAGE_ID_VERSION_V1 = "01"
_instance_lock = threading.Lock()
__index = AtomicInteger()
def __new__(cls, *args, **kwargs):
with MessageIdCodec._instance_lock:
if not hasattr(MessageIdCodec, "_instance"):
MessageIdCodec._instance = object.__new__(cls)
return MessageIdCodec._instance
def __init__(self):
with MessageIdCodec._instance_lock:
if not hasattr(self, "initialized"):
buffer = bytearray(8)
mac = getnode().to_bytes(6, byteorder="big")
buffer[0:6] = mac
pid = getpid()
pid_buffer = bytearray(4)
pid_buffer[0:4] = pid.to_bytes(4, byteorder="big")
buffer[6:8] = pid_buffer[2:4]
self.process_fixed_string_v1 = buffer.hex().upper()
self.seconds_since_custom_epoch = int(
(
datetime.now(timezone.utc)
- datetime(2021, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
).total_seconds()
)
self.seconds_start_timestamp = int(time())
self.seconds = self.__delta_time()
self.sequence = None
self.initialized = True
def next_message_id(self):
delta_seconds = self.__delta_time()
if self.seconds != delta_seconds:
self.seconds = delta_seconds
buffer = bytearray(8)
buffer[0:4] = self.seconds.to_bytes(8, byteorder="big")[4:8]
buffer[4:8] = self.__sequence_id().to_bytes(4, byteorder="big")
return (
MessageIdCodec.MESSAGE_ID_VERSION_V1
+ self.process_fixed_string_v1
+ buffer.hex().upper()
)
@staticmethod
def decode(message_id):
return message_id
""" private """
def __delta_time(self):
return (
int(time()) - self.seconds_start_timestamp + self.seconds_since_custom_epoch
)
def __sequence_id(self):
self.sequence = MessageIdCodec.__index.get_and_increment()
return self.sequence