in src/functions_framework/event_conversion.py [0:0]
def cloud_event_to_background_event(request) -> Tuple[Any, Context]:
"""Converts a background event represented by the given HTTP request into a CloudEvent."""
try:
event = from_http(request.headers, request.get_data())
data = event.data
service, name = _split_ce_source(event["source"])
if event["type"] not in _CE_TO_BACKGROUND_TYPE:
raise EventConversionException(
f'Unable to find background event equivalent type for "{event["type"]}"'
)
if service == _PUBSUB_CE_SERVICE:
resource = {"service": service, "name": name, "type": _PUBSUB_MESSAGE_TYPE}
if "message" in data:
data = data["message"]
if "messageId" in data:
del data["messageId"]
if "publishTime" in data:
del data["publishTime"]
elif service == _FIREBASE_AUTH_CE_SERVICE:
resource = name
if "metadata" in data:
for old, new in _FIREBASE_AUTH_METADATA_FIELDS_CE_TO_BACKGROUND.items():
if old in data["metadata"]:
data["metadata"][new] = data["metadata"][old]
del data["metadata"][old]
elif service == _STORAGE_CE_SERVICE:
resource = {
"name": f"{name}/{event['subject']}",
"service": service,
"type": data["kind"],
}
elif service == _FIREBASE_DB_CE_SERVICE:
name = re.sub("/locations/[^/]+", "", name)
resource = f"{name}/{event['subject']}"
else:
resource = f"{name}/{event['subject']}"
context = Context(
eventId=event["id"],
timestamp=event["time"],
eventType=_CE_TO_BACKGROUND_TYPE[event["type"]],
resource=resource,
)
return (data, context)
except (AttributeError, KeyError, TypeError, MissingRequiredFields):
raise EventConversionException(
"Failed to convert CloudEvent to BackgroundEvent."
)