def start_backfill_subscriber_if_not_running()

in tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/common/ordering.py [0:0]


def start_backfill_subscriber_if_not_running(
        gcs_client: Optional[storage.Client], bkt: storage.Bucket,
        table_prefix: str) -> Optional[storage.Blob]:
    """start the backfill subscriber if it is not already runnning for this
    table prefix.

    created a backfill file for the table prefix if not exists.
    """
    if gcs_client is None:
        gcs_client = storage.Client(client_info=constants.CLIENT_INFO)
    start_backfill = True

    # Do not start subscriber if a START_BACKFILL_FILENAME has been defined
    # in an environment variable and the file has not yet been dropped
    # at the table prefix.
    if constants.START_BACKFILL_FILENAME:
        start_backfill_blob = bkt.blob(
            f"{table_prefix}/{constants.START_BACKFILL_FILENAME}")
        start_backfill = start_backfill_blob.exists(client=gcs_client)
        if not start_backfill:
            print("Not triggering backfill because"
                  f"gs://{start_backfill_blob.bucket.name}/"
                  f"{start_backfill_blob.name} was not found.")

    if start_backfill:
        # Create a _BACKFILL file for this table if not exists
        backfill_blob = bkt.blob(
            f"{table_prefix}/{constants.BACKFILL_FILENAME}")
        try:
            backfill_blob.upload_from_string(
                "",
                # Setting if_generation_match below to 0 makes the operation
                # succeed only if there are no live versions of the blob.
                if_generation_match=0,
                client=gcs_client)
            print("triggered backfill with "
                  f"gs://{backfill_blob.bucket.name}/{backfill_blob.name} "
                  f"created at {backfill_blob.time_created}.")
            return backfill_blob
        except google.api_core.exceptions.PreconditionFailed:
            backfill_blob.reload(client=gcs_client)
            print("backfill already in progress due to: "
                  f"gs://{backfill_blob.bucket.name}/{backfill_blob.name} "
                  f"created at {backfill_blob.time_created}. exiting.")
            return backfill_blob
    else:
        return None