azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/utils.py (23 lines of code) (raw):

# Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. import uamqp import uuid _X_OPT_LOCK_TOKEN = b"x-opt-lock-token" def get_lock_token(message: bytes, index: int) -> str: # Get the lock token from the message lock_token_encoded = message[:index] # Convert the lock token to a UUID using the first 16 bytes lock_token_uuid = uuid.UUID(bytes=lock_token_encoded[:16]) return lock_token_uuid def get_amqp_message(message: bytes, index: int): """ Get the amqp message from the model_binding_data content and create the message. """ amqp_message = message[index + len(_X_OPT_LOCK_TOKEN):] decoded_message = uamqp.Message().decode_from_bytes(amqp_message) return decoded_message def get_decoded_message(content: bytes): """ First, find the end of the lock token. Then, get the lock token UUID and create the delivery annotations dictionary. Finally, get the amqp message and set the delivery annotations. Once the delivery annotations have been set, the amqp message is ready to return. """ if content: try: index = content.find(_X_OPT_LOCK_TOKEN) lock_token = get_lock_token(content, index) delivery_anno_dict = {_X_OPT_LOCK_TOKEN: lock_token} decoded_message = get_amqp_message(content, index) decoded_message.delivery_annotations = delivery_anno_dict return decoded_message except Exception as e: raise ValueError(f"Failed to decode ServiceBus content: {e}") from e return None