def triage_event()

in tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/main.py [0:0]


def triage_event(gcs_client: Optional[storage.Client],
                 bq_client: Optional[bigquery.Client],
                 event_blob: storage.Blob,
                 function_start_time: float,
                 enforce_ordering: bool = False):
    """call the appropriate method based on the details of the trigger event
    blob."""
    bkt = event_blob.bucket
    basename_object_id = os.path.basename(event_blob.name)

    print(f"Received object notification for gs://{event_blob.bucket.name}/"
          f"{event_blob.name}")
    # pylint: disable=no-else-raise
    if enforce_ordering:
        # For SUCCESS files in a backlog directory, ensure that subscriber
        # is running.
        if (basename_object_id == constants.SUCCESS_FILENAME and
                "/_backlog/" in event_blob.name):
            print(f"This notification was for "
                  f"gs://{bkt.name}/{event_blob.name} a "
                  f"{constants.SUCCESS_FILENAME} in a "
                  "/_backlog/ directory. "
                  f"Watiting {constants.ENSURE_SUBSCRIBER_SECONDS} seconds to "
                  "ensure that subscriber is running.")
            ordering.subscriber_monitor(gcs_client, bkt, event_blob)
            return
        if (constants.START_BACKFILL_FILENAME and
                basename_object_id == constants.START_BACKFILL_FILENAME):
            # This will be the first backfill file.
            ordering.start_backfill_subscriber_if_not_running(
                gcs_client, bkt, utils.get_table_prefix(gcs_client, event_blob))
            return
        if basename_object_id == constants.SUCCESS_FILENAME:
            ordering.backlog_publisher(gcs_client, event_blob)
            return
        if basename_object_id == constants.BACKFILL_FILENAME:
            if (event_blob.name !=
                    f"{utils.get_table_prefix(gcs_client, event_blob)}/"
                    f"{constants.BACKFILL_FILENAME}"):
                raise RuntimeError(
                    f"recieved notification for gs://{event_blob.bucket.name}/"
                    f"{event_blob.name} "
                    f"{constants.BACKFILL_FILENAME} files "
                    "are expected only at the table prefix level.")
            ordering.backlog_subscriber(gcs_client, bq_client, event_blob,
                                        function_start_time)
            return
        print(f"ERROR CAUSED BY: {basename_object_id}")
        raise RuntimeError(f"gs://{event_blob.bucket.name}/"
                           f"{event_blob.name} could not be triaged.")
    else:  # Default behavior submit job as soon as success file lands.
        if basename_object_id == constants.SUCCESS_FILENAME:
            utils.apply(
                gcs_client,
                bq_client,
                event_blob,
                None,  # no lock blob when ordering not enabled.
                utils.create_job_id(event_blob.name))