in webhook/main.py [0:0]
def entrypoint(request: object) -> Mapping[str, str]:
data = request.get_json()
if data.get("kind", None) == "storage#object":
# Entrypoint called by Pub-Sub (Eventarc)
return redirect_and_reply(data)
if 'bucket' in data:
# Entrypoint called by REST (possibly by redirect_and_replay)
return cloud_event_entrypoint(
name=data["name"],
event_id=data["id"],
bucket=data["bucket"],
time_created=coerce_datetime_zulu(data["timeCreated"]),
)
if "text" in data:
# Entrypoint called by REST.
return summarization_entrypoint(
name=data["name"],
extracted_text=data["text"],
time_created=datetime.datetime.now(datetime.timezone.utc),
event_id="CURL_TRIGGER",
)
return flask.Response(status=500)