in tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/common/utils.py [0:0]
def handle_bq_lock(gcs_client: storage.Client,
lock_blob: storage.Blob,
next_job_id: Optional[str],
table: bigquery.TableReference,
retry_attempt_cnt: Optional[int] = None):
"""Reclaim the lock blob for the new job id (in-place) or delete the lock
blob if next_job_id is None."""
try:
if next_job_id:
lock_blob_contents = json.dumps(
dict(job_id=next_job_id,
table=table.to_api_repr(),
retry_attempt_cnt=retry_attempt_cnt))
logging.log_with_table(
table,
f"Writing the following content to lock_blob {lock_blob.name}:"
f" {dict(job_id=next_job_id, table=table.to_api_repr(), retry_attempt_cnt=retry_attempt_cnt)}"
)
if lock_blob.exists(client=gcs_client):
lock_blob.upload_from_string(
lock_blob_contents,
if_generation_match=lock_blob.generation,
client=gcs_client)
else: # This happens when submitting the first job in the backlog
lock_blob.upload_from_string(
lock_blob_contents,
if_generation_match=0, # noqa: E126
client=gcs_client)
else:
logging.log_with_table(
table, "releasing lock at: "
f"gs://{lock_blob.bucket.name}/{lock_blob.name}")
lock_blob.delete(
if_generation_match=lock_blob.generation,
client=gcs_client,
)
except (google.api_core.exceptions.PreconditionFailed,
google.api_core.exceptions.NotFound) as err:
if isinstance(err, google.api_core.exceptions.PreconditionFailed):
raise exceptions.BacklogException(
f"The lock at gs://{lock_blob.bucket.name}/{lock_blob.name} "
f"was changed by another process.") from err
logging.log_with_table(
table, "Tried deleting a lock blob that was either already deleted "
"or never existed.")