in src/functions_framework/event_conversion.py [0:0]
def background_event_to_cloud_event(request) -> CloudEvent:
"""Converts a background event represented by the given HTTP request into a CloudEvent."""
event_data = marshal_background_event_data(request)
if not event_data:
raise EventConversionException("Failed to parse JSON")
event_object = BackgroundEvent(**event_data)
data = event_object.data
context = Context(**event_object.context)
if context.event_type not in _BACKGROUND_TO_CE_TYPE:
raise EventConversionException(
f'Unable to find CloudEvent equivalent type for "{context.event_type}"'
)
new_type = _BACKGROUND_TO_CE_TYPE[context.event_type]
service, resource, subject = _split_resource(context)
source = f"//{service}/{resource}"
# Handle Pub/Sub events.
if service == _PUBSUB_CE_SERVICE:
if "messageId" not in data:
data["messageId"] = context.event_id
if "publishTime" not in data:
data["publishTime"] = context.timestamp
data = {"message": data}
# Handle Firebase Auth events.
if service == _FIREBASE_AUTH_CE_SERVICE:
if "metadata" in data:
for old, new in _FIREBASE_AUTH_METADATA_FIELDS_BACKGROUND_TO_CE.items():
if old in data["metadata"]:
data["metadata"][new] = data["metadata"][old]
del data["metadata"][old]
if "uid" in data:
uid = data["uid"]
subject = f"users/{uid}"
# Handle Firebase DB events.
if service == _FIREBASE_DB_CE_SERVICE:
# The CE source of firebasedatabase CloudEvents includes location information
# that is inferred from the 'domain' field of legacy events.
if "domain" not in event_data:
raise EventConversionException(
"Invalid FirebaseDB event payload: missing 'domain'"
)
domain = event_data["domain"]
location = "us-central1"
if domain != "firebaseio.com":
location = domain.split(".")[0]
resource = f"projects/_/locations/{location}/{resource}"
source = f"//{service}/{resource}"
metadata = {
"id": context.event_id,
"time": context.timestamp,
"specversion": _CLOUD_EVENT_SPEC_VERSION,
"datacontenttype": "application/json",
"type": new_type,
"source": source,
}
if subject:
metadata["subject"] = subject
return CloudEvent(metadata, data)