def _event_view_func_wrapper()

in src/functions_framework/__init__.py [0:0]


def _event_view_func_wrapper(function, request):
    @execution_id.set_execution_context(request, _enable_execution_id_logging())
    def view_func(path):
        if event_conversion.is_convertable_cloud_event(request):
            # Convert this CloudEvent to the equivalent background event data and context.
            data, context = event_conversion.cloud_event_to_background_event(request)
            function(data, context)
        elif is_binary(request.headers):
            # Support CloudEvents in binary content mode, with data being the
            # whole request body and context attributes retrieved from request
            # headers.
            data = request.get_data()
            context = Context(
                eventId=request.headers.get("ce-eventId"),
                timestamp=request.headers.get("ce-timestamp"),
                eventType=request.headers.get("ce-eventType"),
                resource=request.headers.get("ce-resource"),
            )
            function(data, context)
        else:
            # This is a regular CloudEvent
            event_data = event_conversion.marshal_background_event_data(request)
            if not event_data:
                flask.abort(400)
            event_object = BackgroundEvent(**event_data)
            data = event_object.data
            context = Context(**event_object.context)
            function(data, context)

        return "OK"

    return view_func