def handle_bq_lock()

in tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/common/utils.py [0:0]


def handle_bq_lock(gcs_client: storage.Client,
                   lock_blob: storage.Blob,
                   next_job_id: Optional[str],
                   table: bigquery.TableReference,
                   retry_attempt_cnt: Optional[int] = None):
    """Reclaim the lock blob for the new job id (in-place) or delete the lock
    blob if next_job_id is None."""
    try:
        if next_job_id:
            lock_blob_contents = json.dumps(
                dict(job_id=next_job_id,
                     table=table.to_api_repr(),
                     retry_attempt_cnt=retry_attempt_cnt))
            logging.log_with_table(
                table,
                f"Writing the following content to lock_blob {lock_blob.name}:"
                f" {dict(job_id=next_job_id, table=table.to_api_repr(), retry_attempt_cnt=retry_attempt_cnt)}"
            )
            if lock_blob.exists(client=gcs_client):
                lock_blob.upload_from_string(
                    lock_blob_contents,
                    if_generation_match=lock_blob.generation,
                    client=gcs_client)
            else:  # This happens when submitting the first job in the backlog
                lock_blob.upload_from_string(
                    lock_blob_contents,
                    if_generation_match=0,  # noqa: E126
                    client=gcs_client)
        else:
            logging.log_with_table(
                table, "releasing lock at: "
                f"gs://{lock_blob.bucket.name}/{lock_blob.name}")
            lock_blob.delete(
                if_generation_match=lock_blob.generation,
                client=gcs_client,
            )
    except (google.api_core.exceptions.PreconditionFailed,
            google.api_core.exceptions.NotFound) as err:
        if isinstance(err, google.api_core.exceptions.PreconditionFailed):
            raise exceptions.BacklogException(
                f"The lock at gs://{lock_blob.bucket.name}/{lock_blob.name} "
                f"was changed by another process.") from err
        logging.log_with_table(
            table, "Tried deleting a lock blob that was either already deleted "
            "or never existed.")