in src/functions_framework/event_conversion.py [0:0]
def _split_resource(context: Context) -> Tuple[str, str, str]:
"""Splits a background event's resource into a CloudEvent service, resource, and subject."""
service = ""
resource = ""
if isinstance(context.resource, dict):
service = context.resource.get("service", "")
resource = context.resource["name"]
else:
resource = context.resource
# If there's no service we'll choose an appropriate one based on the event type.
if not service:
for b_service, ce_service in _SERVICE_BACKGROUND_TO_CE.items():
if context.event_type.startswith(b_service):
service = ce_service
break
if not service:
raise EventConversionException(
"Unable to find CloudEvent equivalent service "
f"for {context.event_type}"
)
# If we don't need to split the resource string then we're done.
if service not in _CE_SERVICE_TO_RESOURCE_RE:
return service, resource, ""
# Split resource into resource and subject.
match = _CE_SERVICE_TO_RESOURCE_RE[service].fullmatch(resource)
if not match:
raise EventConversionException("Resource regex did not match")
return service, match.group(1), match.group(2)