in common/py_libs/bq_helper.py [0:0]
def label_dataset(bq_client: bigquery.Client,
dataset: bigquery.Dataset) -> None:
"""
Adds Cortex label to BigQuery Dataset.
Only updates dataset if label does not exist already in dataset.
Args:
bq_client (bigquery.Client): BigQuery client to use.
dataset (bigquery.Dataset): Dataset to label.
Raises:
NotFoundCError: If dataset does not exist.
"""
try:
labels = dataset.labels or {}
update_labels = False
for key, value in constants.BQ_DATASET_LABEL.items():
if key not in labels:
labels[key] = value
update_labels = True
if update_labels:
dataset.labels = labels
bq_client.update_dataset(dataset, ["labels"])
except NotFound:
error_msg = (
f"Dataset {dataset.project}.{dataset.dataset_id} not found.")
raise exc.NotFoundCError(error_msg) from NotFound
except Forbidden:
logger.info(
"Permission to tag %s.%s denied. Skipping tagging dataset.",
dataset.project,
dataset.dataset_id)