def cloud_event_to_background_event()

in src/functions_framework/event_conversion.py [0:0]


def cloud_event_to_background_event(request) -> Tuple[Any, Context]:
    """Converts a background event represented by the given HTTP request into a CloudEvent."""
    try:
        event = from_http(request.headers, request.get_data())
        data = event.data
        service, name = _split_ce_source(event["source"])

        if event["type"] not in _CE_TO_BACKGROUND_TYPE:
            raise EventConversionException(
                f'Unable to find background event equivalent type for "{event["type"]}"'
            )

        if service == _PUBSUB_CE_SERVICE:
            resource = {"service": service, "name": name, "type": _PUBSUB_MESSAGE_TYPE}
            if "message" in data:
                data = data["message"]
            if "messageId" in data:
                del data["messageId"]
            if "publishTime" in data:
                del data["publishTime"]
        elif service == _FIREBASE_AUTH_CE_SERVICE:
            resource = name
            if "metadata" in data:
                for old, new in _FIREBASE_AUTH_METADATA_FIELDS_CE_TO_BACKGROUND.items():
                    if old in data["metadata"]:
                        data["metadata"][new] = data["metadata"][old]
                        del data["metadata"][old]
        elif service == _STORAGE_CE_SERVICE:
            resource = {
                "name": f"{name}/{event['subject']}",
                "service": service,
                "type": data["kind"],
            }
        elif service == _FIREBASE_DB_CE_SERVICE:
            name = re.sub("/locations/[^/]+", "", name)
            resource = f"{name}/{event['subject']}"
        else:
            resource = f"{name}/{event['subject']}"

        context = Context(
            eventId=event["id"],
            timestamp=event["time"],
            eventType=_CE_TO_BACKGROUND_TYPE[event["type"]],
            resource=resource,
        )
        return (data, context)
    except (AttributeError, KeyError, TypeError, MissingRequiredFields):
        raise EventConversionException(
            "Failed to convert CloudEvent to BackgroundEvent."
        )