in tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/main.py [0:0]
def triage_event(gcs_client: Optional[storage.Client],
bq_client: Optional[bigquery.Client],
event_blob: storage.Blob,
function_start_time: float,
enforce_ordering: bool = False):
"""call the appropriate method based on the details of the trigger event
blob."""
bkt = event_blob.bucket
basename_object_id = os.path.basename(event_blob.name)
print(f"Received object notification for gs://{event_blob.bucket.name}/"
f"{event_blob.name}")
# pylint: disable=no-else-raise
if enforce_ordering:
# For SUCCESS files in a backlog directory, ensure that subscriber
# is running.
if (basename_object_id == constants.SUCCESS_FILENAME and
"/_backlog/" in event_blob.name):
print(f"This notification was for "
f"gs://{bkt.name}/{event_blob.name} a "
f"{constants.SUCCESS_FILENAME} in a "
"/_backlog/ directory. "
f"Watiting {constants.ENSURE_SUBSCRIBER_SECONDS} seconds to "
"ensure that subscriber is running.")
ordering.subscriber_monitor(gcs_client, bkt, event_blob)
return
if (constants.START_BACKFILL_FILENAME and
basename_object_id == constants.START_BACKFILL_FILENAME):
# This will be the first backfill file.
ordering.start_backfill_subscriber_if_not_running(
gcs_client, bkt, utils.get_table_prefix(gcs_client, event_blob))
return
if basename_object_id == constants.SUCCESS_FILENAME:
ordering.backlog_publisher(gcs_client, event_blob)
return
if basename_object_id == constants.BACKFILL_FILENAME:
if (event_blob.name !=
f"{utils.get_table_prefix(gcs_client, event_blob)}/"
f"{constants.BACKFILL_FILENAME}"):
raise RuntimeError(
f"recieved notification for gs://{event_blob.bucket.name}/"
f"{event_blob.name} "
f"{constants.BACKFILL_FILENAME} files "
"are expected only at the table prefix level.")
ordering.backlog_subscriber(gcs_client, bq_client, event_blob,
function_start_time)
return
print(f"ERROR CAUSED BY: {basename_object_id}")
raise RuntimeError(f"gs://{event_blob.bucket.name}/"
f"{event_blob.name} could not be triaged.")
else: # Default behavior submit job as soon as success file lands.
if basename_object_id == constants.SUCCESS_FILENAME:
utils.apply(
gcs_client,
bq_client,
event_blob,
None, # no lock blob when ordering not enabled.
utils.create_job_id(event_blob.name))