def background_event_to_cloud_event()

in src/functions_framework/event_conversion.py [0:0]


def background_event_to_cloud_event(request) -> CloudEvent:
    """Converts a background event represented by the given HTTP request into a CloudEvent."""
    event_data = marshal_background_event_data(request)
    if not event_data:
        raise EventConversionException("Failed to parse JSON")

    event_object = BackgroundEvent(**event_data)
    data = event_object.data
    context = Context(**event_object.context)

    if context.event_type not in _BACKGROUND_TO_CE_TYPE:
        raise EventConversionException(
            f'Unable to find CloudEvent equivalent type for "{context.event_type}"'
        )
    new_type = _BACKGROUND_TO_CE_TYPE[context.event_type]

    service, resource, subject = _split_resource(context)
    source = f"//{service}/{resource}"

    # Handle Pub/Sub events.
    if service == _PUBSUB_CE_SERVICE:
        if "messageId" not in data:
            data["messageId"] = context.event_id
        if "publishTime" not in data:
            data["publishTime"] = context.timestamp
        data = {"message": data}

    # Handle Firebase Auth events.
    if service == _FIREBASE_AUTH_CE_SERVICE:
        if "metadata" in data:
            for old, new in _FIREBASE_AUTH_METADATA_FIELDS_BACKGROUND_TO_CE.items():
                if old in data["metadata"]:
                    data["metadata"][new] = data["metadata"][old]
                    del data["metadata"][old]
        if "uid" in data:
            uid = data["uid"]
            subject = f"users/{uid}"

    # Handle Firebase DB events.
    if service == _FIREBASE_DB_CE_SERVICE:
        # The CE source of firebasedatabase CloudEvents includes location information
        # that is inferred from the 'domain' field of legacy events.
        if "domain" not in event_data:
            raise EventConversionException(
                "Invalid FirebaseDB event payload: missing 'domain'"
            )

        domain = event_data["domain"]
        location = "us-central1"
        if domain != "firebaseio.com":
            location = domain.split(".")[0]

        resource = f"projects/_/locations/{location}/{resource}"
        source = f"//{service}/{resource}"

    metadata = {
        "id": context.event_id,
        "time": context.timestamp,
        "specversion": _CLOUD_EVENT_SPEC_VERSION,
        "datacontenttype": "application/json",
        "type": new_type,
        "source": source,
    }

    if subject:
        metadata["subject"] = subject

    return CloudEvent(metadata, data)