in src/functions_framework/__init__.py [0:0]
def _event_view_func_wrapper(function, request):
@execution_id.set_execution_context(request, _enable_execution_id_logging())
def view_func(path):
if event_conversion.is_convertable_cloud_event(request):
# Convert this CloudEvent to the equivalent background event data and context.
data, context = event_conversion.cloud_event_to_background_event(request)
function(data, context)
elif is_binary(request.headers):
# Support CloudEvents in binary content mode, with data being the
# whole request body and context attributes retrieved from request
# headers.
data = request.get_data()
context = Context(
eventId=request.headers.get("ce-eventId"),
timestamp=request.headers.get("ce-timestamp"),
eventType=request.headers.get("ce-eventType"),
resource=request.headers.get("ce-resource"),
)
function(data, context)
else:
# This is a regular CloudEvent
event_data = event_conversion.marshal_background_event_data(request)
if not event_data:
flask.abort(400)
event_object = BackgroundEvent(**event_data)
data = event_object.data
context = Context(**event_object.context)
function(data, context)
return "OK"
return view_func