in api-connector/src/handler.py [0:0]
def execute(request: flask.Request) -> Tuple[Any, int]:
req_json = request.get_json(silent=True)
if not req_json:
raise Exception("Invalid request")
source = Utils.get_property(req_json, "source") or Sources.BIGQUERY_ROUTINE.name
try:
match (Sources[source]):
case Sources.BIGQUERY_ROUTINE:
(res, _) = BigQueryRoutineRequest.execute(req_json)
return res, 200
case Sources.CLOUD_TASK:
(res, _) = CloudTaskRequest.execute(req_json)
return res, 202 # This status code avoids retries by Cloud Tasks.
except KeyError as ke:
msg = f"Source of type '{source}' is not supported."
logger.error(msg)
return msg, 422