azurefunctions-extensions-bindings-eventhub/azurefunctions/extensions/bindings/eventhub/eventData.py (29 lines of code) (raw):

# Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. from typing import Optional import uamqp from azure.eventhub import EventData as EventDataSDK from azurefunctions.extensions.base import Datum, SdkType class EventData(SdkType): def __init__(self, *, data: Datum) -> None: # model_binding_data properties self._data = data self._version = None self._source = None self._content_type = None self._content = None self.decoded_message = None if self._data: self._version = data.version self._source = data.source self._content_type = data.content_type self._content = data.content self.decoded_message = self._get_eventhub_content(self._content) def _get_eventhub_content(self, content): """ When receiving the EventBindingData, the content field is in the form of bytes. This content must be decoded in order to construct an EventData object from the azure.eventhub SDK. The .NET worker uses the Azure.Core.Amqp library to do this: https://github.com/Azure/azure-functions-dotnet-worker/blob/main/extensions/Worker.Extensions.EventHubs/src/EventDataConverter.cs#L45 """ if content: try: return uamqp.Message().decode_from_bytes(content) except Exception as e: raise ValueError(f"Failed to decode EventHub content: {e}") from e return None def get_sdk_type(self) -> Optional[EventDataSDK]: """ When receiving an EventHub message, the content portion after being decoded is used in the constructor to create an EventData object. This will contain fields such as message, enqueued_time, and more. """ # https://github.com/Azure/azure-sdk-for-python/issues/39711 if self.decoded_message: return EventDataSDK._from_message(self.decoded_message) return None