in tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/common/ordering.py [0:0]
def start_backfill_subscriber_if_not_running(
gcs_client: Optional[storage.Client], bkt: storage.Bucket,
table_prefix: str) -> Optional[storage.Blob]:
"""start the backfill subscriber if it is not already runnning for this
table prefix.
created a backfill file for the table prefix if not exists.
"""
if gcs_client is None:
gcs_client = storage.Client(client_info=constants.CLIENT_INFO)
start_backfill = True
# Do not start subscriber if a START_BACKFILL_FILENAME has been defined
# in an environment variable and the file has not yet been dropped
# at the table prefix.
if constants.START_BACKFILL_FILENAME:
start_backfill_blob = bkt.blob(
f"{table_prefix}/{constants.START_BACKFILL_FILENAME}")
start_backfill = start_backfill_blob.exists(client=gcs_client)
if not start_backfill:
print("Not triggering backfill because"
f"gs://{start_backfill_blob.bucket.name}/"
f"{start_backfill_blob.name} was not found.")
if start_backfill:
# Create a _BACKFILL file for this table if not exists
backfill_blob = bkt.blob(
f"{table_prefix}/{constants.BACKFILL_FILENAME}")
try:
backfill_blob.upload_from_string(
"",
# Setting if_generation_match below to 0 makes the operation
# succeed only if there are no live versions of the blob.
if_generation_match=0,
client=gcs_client)
print("triggered backfill with "
f"gs://{backfill_blob.bucket.name}/{backfill_blob.name} "
f"created at {backfill_blob.time_created}.")
return backfill_blob
except google.api_core.exceptions.PreconditionFailed:
backfill_blob.reload(client=gcs_client)
print("backfill already in progress due to: "
f"gs://{backfill_blob.bucket.name}/{backfill_blob.name} "
f"created at {backfill_blob.time_created}. exiting.")
return backfill_blob
else:
return None