in azure_functions_worker/bindings/datumdef.py [0:0]
def datum_as_proto(datum: Datum) -> protos.TypedData:
if datum.type == 'string':
return protos.TypedData(string=datum.value)
elif datum.type == 'bytes':
return protos.TypedData(bytes=datum.value)
elif datum.type == 'json':
return protos.TypedData(json=datum.value)
elif datum.type == 'http':
return protos.TypedData(http=protos.RpcHttp(
status_code=datum.value['status_code'].value,
headers={
k: v.value
for k, v in datum.value['headers'].items()
},
cookies=parse_to_rpc_http_cookie_list(datum.value.get('cookies')),
enable_content_negotiation=False,
body=datum_as_proto(datum.value['body']),
))
elif datum.type is None:
return None
elif datum.type == 'dict':
# TypedData doesn't support dict, so we return it as json
return protos.TypedData(json=json.dumps(datum.value))
elif datum.type == 'list':
# TypedData doesn't support list, so we return it as json
return protos.TypedData(json=json.dumps(datum.value))
elif datum.type == 'int':
return protos.TypedData(int=datum.value)
elif datum.type == 'double':
return protos.TypedData(double=datum.value)
elif datum.type == 'bool':
# TypedData doesn't support bool, so we return it as an int
return protos.TypedData(int=int(datum.value))
else:
raise NotImplementedError(
'unexpected Datum type: {!r}'.format(datum.type)
)