in azure/functions/blob.py [0:0]
def decode(cls, data: meta.Datum, *, trigger_metadata) -> Any:
if data is None or data.type is None:
return None
data_type = data.type
if data_type == 'string':
data = data.value.encode('utf-8')
elif data_type == 'bytes':
data = data.value
else:
raise ValueError(
f'unexpected type of data received for the "blob" binding '
f': {data_type!r}'
)
if not trigger_metadata:
return InputStream(data=data)
else:
properties = cls._decode_trigger_metadata_field(
trigger_metadata, 'Properties', python_type=dict)
if properties:
blob_properties = properties
length = properties.get('ContentLength') or \
properties.get('Length')
length = int(length) if length else None
else:
blob_properties = None
length = None
metadata = None
try:
metadata = cls._decode_trigger_metadata_field(trigger_metadata,
'Metadata',
python_type=dict)
except (KeyError, ValueError):
# avoiding any exceptions when fetching Metadata as the
# metadata type is unclear.
pass
return InputStream(
data=data,
name=cls._decode_trigger_metadata_field(
trigger_metadata, 'BlobTrigger', python_type=str),
length=length,
uri=cls._decode_trigger_metadata_field(
trigger_metadata, 'Uri', python_type=str),
blob_properties=blob_properties,
metadata=metadata
)