bigquery_etl/metadata/publish_metadata.py (114 lines of code) (raw):
"""Update metadata of BigQuery tables and views."""
from argparse import ArgumentParser
from pathlib import Path
from google.cloud import bigquery
from ..config import ConfigLoader
from ..util import standard_args
from .parse_metadata import ExternalDataFormat, Metadata
METADATA_FILE = "metadata.yaml"
DEFAULT_PATTERN = (
f"{ConfigLoader.get('default', 'project', fallback='moz-fx-data-shared-prod')}:*.*"
)
parser = ArgumentParser(description=__doc__)
parser.add_argument(
"patterns",
metavar="[project:]dataset[.table]",
default=[DEFAULT_PATTERN],
nargs="*",
help="Table that should have a latest-version view, may use shell-style wildcards,"
f" defaults to: {DEFAULT_PATTERN}",
)
parser.add_argument("--target", help="File or directory containing metadata files")
standard_args.add_log_level(parser)
class InvalidExternalDataConfigException(Exception):
"""Raised invalid config for external data tables."""
def publish_metadata(client, project, dataset, table, metadata):
"""Push metadata to BigQuery tables."""
try:
table_ref = client.dataset(dataset).table(table)
table = client.get_table(table_ref)
if metadata.friendly_name is not None:
table.friendly_name = metadata.friendly_name
if metadata.description is not None:
table.description = metadata.description
table.labels = {
key: value
for key, value in metadata.labels.items()
if isinstance(value, str)
}
if metadata.deprecated is True:
table.labels["deprecated"] = "true"
if metadata.deletion_date:
table.labels["deletion_date"] = metadata.deletion_date.strftime("%Y-%m-%d")
# TODO: in the future we can consider updating the table expiration date based on deletion_date
if metadata.monitoring and metadata.monitoring.enabled:
table.labels["monitoring"] = "true"
client.update_table(table, ["friendly_name", "description", "labels"])
print("Published metadata for: {}.{}.{}".format(project, dataset, table))
except Exception as e:
print(e)
def attach_metadata(artifact_file_path: Path, table: bigquery.Table) -> None:
"""Add metadata from query file's metadata.yaml to table object."""
try:
if artifact_file_path.is_file() and artifact_file_path.name == METADATA_FILE:
metadata = Metadata.from_file(artifact_file_path)
else:
metadata = Metadata.of_query_file(artifact_file_path)
except FileNotFoundError:
return
table.description = metadata.description
table.friendly_name = metadata.friendly_name
if metadata.bigquery and metadata.bigquery.time_partitioning:
table.time_partitioning = bigquery.TimePartitioning(
metadata.bigquery.time_partitioning.type.bigquery_type,
field=metadata.bigquery.time_partitioning.field,
require_partition_filter=(
metadata.bigquery.time_partitioning.require_partition_filter
),
expiration_ms=metadata.bigquery.time_partitioning.expiration_ms,
)
elif metadata.bigquery and metadata.bigquery.range_partitioning:
table.range_partitioning = bigquery.RangePartitioning(
field=metadata.bigquery.range_partitioning.field,
range_=bigquery.PartitionRange(
start=metadata.bigquery.range_partitioning.range.start,
end=metadata.bigquery.range_partitioning.range.end,
interval=metadata.bigquery.range_partitioning.range.interval,
),
)
if metadata.bigquery and metadata.bigquery.clustering:
table.clustering_fields = metadata.bigquery.clustering.fields
# BigQuery only allows for string type labels with specific requirements to be published:
# https://cloud.google.com/bigquery/docs/labels-intro#requirements
if metadata.labels:
table.labels = {
key: value
for key, value in metadata.labels.items()
if isinstance(value, str)
}
def attach_external_data_config(artifact_file_path, table) -> None:
"""Add external data metadata from query file's metadata.yaml to table object."""
try:
if artifact_file_path.is_file() and artifact_file_path.name == METADATA_FILE:
metadata = Metadata.from_file(artifact_file_path)
else:
metadata = Metadata.of_query_file(artifact_file_path)
except FileNotFoundError:
raise InvalidExternalDataConfigException(
f"Invalid metadata: External data table "
f"{artifact_file_path} missing metadata file"
)
if not metadata.external_data:
raise InvalidExternalDataConfigException(
f"Invalid metadata: External data table "
f"{artifact_file_path} has no external_data config"
)
if metadata.external_data.format not in (
ExternalDataFormat.GOOGLE_SHEETS,
ExternalDataFormat.CSV,
):
raise InvalidExternalDataConfigException(
f"Invalid metadata: External data table "
f"{artifact_file_path} has unsupported format {metadata.external_data.format}"
)
external_config = bigquery.ExternalConfig(
metadata.external_data.format.value.upper()
)
external_config.source_uris = metadata.external_data.source_uris
external_config.ignore_unknown_values = True
external_config.autodetect = False
for key, v in metadata.external_data.options.items():
setattr(external_config.options, key, v)
table.external_data_configuration = external_config