in bigquery_etl/metadata/publish_metadata.py [0:0]
def attach_metadata(artifact_file_path: Path, table: bigquery.Table) -> None:
"""Add metadata from query file's metadata.yaml to table object."""
try:
if artifact_file_path.is_file() and artifact_file_path.name == METADATA_FILE:
metadata = Metadata.from_file(artifact_file_path)
else:
metadata = Metadata.of_query_file(artifact_file_path)
except FileNotFoundError:
return
table.description = metadata.description
table.friendly_name = metadata.friendly_name
if metadata.bigquery and metadata.bigquery.time_partitioning:
table.time_partitioning = bigquery.TimePartitioning(
metadata.bigquery.time_partitioning.type.bigquery_type,
field=metadata.bigquery.time_partitioning.field,
require_partition_filter=(
metadata.bigquery.time_partitioning.require_partition_filter
),
expiration_ms=metadata.bigquery.time_partitioning.expiration_ms,
)
elif metadata.bigquery and metadata.bigquery.range_partitioning:
table.range_partitioning = bigquery.RangePartitioning(
field=metadata.bigquery.range_partitioning.field,
range_=bigquery.PartitionRange(
start=metadata.bigquery.range_partitioning.range.start,
end=metadata.bigquery.range_partitioning.range.end,
interval=metadata.bigquery.range_partitioning.range.interval,
),
)
if metadata.bigquery and metadata.bigquery.clustering:
table.clustering_fields = metadata.bigquery.clustering.fields
# BigQuery only allows for string type labels with specific requirements to be published:
# https://cloud.google.com/bigquery/docs/labels-intro#requirements
if metadata.labels:
table.labels = {
key: value
for key, value in metadata.labels.items()
if isinstance(value, str)
}