azurefunctions-extensions-bindings-eventhub/azurefunctions/extensions/bindings/eventhub/eventDataConverter.py (47 lines of code) (raw):
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import collections.abc
from typing import Any, Optional, get_args, get_origin
from azurefunctions.extensions.base import Datum, InConverter, OutConverter
from .eventData import EventData
class EventDataConverter(
InConverter,
OutConverter,
binding="eventHub",
trigger="eventHubTrigger",
):
@classmethod
def check_input_type_annotation(cls, pytype: type) -> bool:
if pytype is None:
return False
# The annotation is a class/type (not an object) - not iterable
if (isinstance(pytype, type)
and issubclass(pytype, EventData)):
return True
# An iterable who only has one inner type and is a subclass of SdkType
return cls._is_iterable_supported_type(pytype)
@classmethod
def _is_iterable_supported_type(cls, annotation: type) -> bool:
# Check base type from type hint. Ex: List from List[SdkType]
base_type = get_origin(annotation)
if (base_type is None
or not issubclass(base_type, collections.abc.Iterable)):
return False
inner_types = get_args(annotation)
if inner_types is None or len(inner_types) != 1:
return False
inner_type = inner_types[0]
return (isinstance(inner_type, type)
and issubclass(inner_type, EventData))
@classmethod
def decode(cls, data: Datum, *, trigger_metadata, pytype) -> Optional[Any]:
"""
EventHub allows for batches. This means the cardinality can be one or many.
When the cardinality is one:
- The data is of type "model_binding_data" - each event is an independent
function invocation
When the cardinality is many:
- The data is of type "collection_model_binding_data" - all events are sent
in a single function invocation
- collection_model_binding_data has 1 or more model_binding_data objects
"""
if data is None or data.type is None:
return None
# Process each model_binding_data in the collection
if data.type == "collection_model_binding_data":
try:
return [EventData(data=mbd).get_sdk_type()
for mbd in data.value.model_binding_data]
except Exception as e:
raise ValueError("Failed to decode incoming EventHub batch: "
+ repr(e)) from e
# Get model_binding_data fields directly
if data.type == "model_binding_data":
return EventData(data=data.value).get_sdk_type()
raise ValueError(
"Unexpected type of data received for the 'eventhub' binding: "
+ repr(data.type)
)