common/py_libs/bq_materializer.py (186 lines of code) (raw):
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Library for materializer related functions."""
import logging
from google.cloud import bigquery
logger = logging.getLogger(__name__)
# Entry types supported by materializer.
_MATERIALIZER_ENTRY_TYPES = ["table", "view", "script", "k9_dawg"]
# SQL Entry types supported by materializer.
_SQL_ENTRY_TYPES = ["view", "table", "script"]
# Frequency to refresh CDC table. These values corresponds to
# Apache Airflow / Cloud Composer DAG schedule interval values.
_LOAD_FREQUENCIES = [
"runtime", "None", "@once", "@hourly", "@daily", "@weekly", "@monthly",
"@yearly"
]
# Supported partition types.
_PARTITION_TYPES = ["time", "integer_range"]
# Supported grains for time based partitioning.
_TIME_PARTITION_GRAIN_LIST = ["hour", "day", "month", "year"]
# Dict to convert string values to correct partitioning type.
_TIME_PARTITION_GRAIN_DICT = {
"hour": bigquery.TimePartitioningType.HOUR,
"day": bigquery.TimePartitioningType.DAY,
"month": bigquery.TimePartitioningType.MONTH,
"year": bigquery.TimePartitioningType.YEAR
}
# Column data types supported for time based partitioning.
_TIME_PARTITION_DATA_TYPES = ["DATE", "TIMESTAMP", "DATETIME"]
def validate_cluster_columns(cluster_details, target_schema):
"""Checks schema to make sure columns are appropriate for clustering."""
cluster_columns = cluster_details["columns"]
for column in cluster_columns:
cluster_column_details = [
field for field in target_schema if field.name == column
]
if not cluster_column_details:
raise ValueError(
f"Column '{column}' specified for clustering does not exist "
"in the table.")
def validate_partition_columns(partition_details, target_schema):
"""Checks schema to make sure columns are appropriate for partitioning."""
column = partition_details["column"]
partition_column_details = [
field for field in target_schema if field.name == column
]
if not partition_column_details:
raise ValueError(
f"Column '{column}' specified for partitioning does not exist in "
"the table.")
# Since there will be only value in the list (a column exists only once
# in a table), let's just use the first value from the list.
partition_column_type = partition_column_details[0].field_type
partition_type = partition_details["partition_type"]
if (partition_type == "time" and
partition_column_type not in _TIME_PARTITION_DATA_TYPES):
raise ValueError(
"For 'partition_type' = 'time', partitioning column has to be "
"one of the following data types:"
f"{_TIME_PARTITION_DATA_TYPES}.\n"
f"But column '{column}' is of '{partition_column_type}' type.")
if (partition_type == "integer_range" and
partition_column_type != "INTEGER"):
raise ValueError(
"Error: For 'partition_type' = 'integer_range', partitioning "
f"column has to be of INTEGER data type. But column '{column}' is "
f"of '{partition_column_type}'.")
def add_cluster_to_table_def(table_def, cluster_details):
validate_cluster_columns(cluster_details, table_def.schema)
# Add clustering clause to BQ table definition.
table_def.clustering_fields = cluster_details["columns"]
return table_def
def add_partition_to_table_def(table_def, partition_details):
validate_partition_columns(partition_details, table_def.schema)
# Add partitioning clause to BQ table definition.
if partition_details["partition_type"] == "time":
time_partition_grain = partition_details["time_grain"]
table_def.time_partitioning = bigquery.TimePartitioning(
field=partition_details["column"],
type_=_TIME_PARTITION_GRAIN_DICT[time_partition_grain])
else:
integer_range_bucket = partition_details["integer_range_bucket"]
bucket_start = integer_range_bucket["start"]
bucket_end = integer_range_bucket["end"]
bucket_interval = integer_range_bucket["interval"]
table_def.range_partitioning = bigquery.RangePartitioning(
field=partition_details["column"],
range_=bigquery.PartitionRange(start=bucket_start,
end=bucket_end,
interval=bucket_interval))
return table_def
def validate_cluster_details(cluster_details):
cluster_columns = cluster_details.get("columns")
if not cluster_columns or len(cluster_columns) == 0:
raise ValueError(
"'columns' property missing from 'cluster_details' property.")
if not isinstance(cluster_columns, list):
raise ValueError(
"'columns' property in 'cluster_details' has to be a List.")
if len(cluster_columns) > 4:
raise ValueError(
"More than 4 columns specified in 'cluster_details' property. "
"BigQuery supports maximum of 4 columns for table cluster.")
def validate_partition_details(partition_details):
partition_column = partition_details.get("column")
if not partition_column:
raise ValueError(
"Partition 'column' property missing from 'partition_details' "
"property.")
partition_type = partition_details.get("partition_type")
if not partition_type:
raise ValueError(
"'partition_type' property missing from 'partition_details' "
"property.")
if partition_type not in _PARTITION_TYPES:
raise ValueError("'partition_type' has to be one of the following: "
f"{_PARTITION_TYPES}.\n"
f"Specified 'partition_type' is '{partition_type}'.")
if partition_type == "time":
time_partition_grain = partition_details.get("time_grain")
if not time_partition_grain:
raise ValueError(
"'time_grain' property missing for 'time' based partition.")
if time_partition_grain not in _TIME_PARTITION_GRAIN_LIST:
raise ValueError(
"'time_grain' property has to be one of the following:"
f"{_TIME_PARTITION_GRAIN_LIST}.\n"
f"Specified 'time_grain' is '{time_partition_grain}'.")
if partition_type == "integer_range":
integer_range_bucket = partition_details.get("integer_range_bucket")
if not integer_range_bucket:
raise ValueError(
"'integer_range_bucket' property missing for 'integer_range' "
"based partition.")
bucket_start = integer_range_bucket.get("start")
bucket_end = integer_range_bucket.get("end")
bucket_interval = integer_range_bucket.get("interval")
if (bucket_start is None or bucket_end is None or
bucket_interval is None):
raise ValueError(
"Error: 'start', 'end' or 'interval' property missing for the "
"'integer_range_bucket' property.")
def validate_table_setting(table_setting):
"""Makes sure the materializer setting for a table is valid."""
if not table_setting:
raise ValueError("Missing 'table_setting` section. This section is "
"mandatory when BQ object type is 'table'.")
logger.debug("table_settings :\n %s", table_setting)
load_frequency = table_setting.get("load_frequency")
dag_setting = table_setting.get("dag_setting")
if not load_frequency and not dag_setting:
raise ValueError("Missing 'load_frequency' and 'dag_setting'"
"properties. At least one must be set.")
if load_frequency and dag_setting and dag_setting.get("parents"):
raise ValueError("Both 'load_frequency' and 'dag_setting.parents' are "
"set. Only one must be set.")
# TODO: add cron validation
# if load_frequency not in _LOAD_FREQUENCIES:
# raise ValueError(f"'load_frequency' has to be one of the following:"
# f"{_LOAD_FREQUENCIES}.\n"
# f"Specified 'load_frequency' is '{load_frequency}'.")
partition_details = table_setting.get("partition_details")
cluster_details = table_setting.get("cluster_details")
# Validate partition details.
if partition_details:
validate_partition_details(partition_details)
if cluster_details:
validate_cluster_details(cluster_details)
def validate_bq_materializer_settings(materializer_settings):
"""Validates all BQ materializer settings."""
logger.info("Validating all BQ materializer settings ...")
logger.debug("materializer_settings = %s", materializer_settings)
sql_files_processed = set()
k9s_processed = set()
for entry_setting in materializer_settings:
entry_type = entry_setting.get("type")
if not entry_type:
raise ValueError("Missing 'type' property.")
if entry_type not in _MATERIALIZER_ENTRY_TYPES:
raise ValueError(f"'type' has to be one of the following:"
f"{_MATERIALIZER_ENTRY_TYPES}.\n"
f"Specified 'type' is '{entry_type}'.")
if entry_type in _SQL_ENTRY_TYPES:
sql_file = entry_setting.get("sql_file")
if sql_file:
logger.debug(" Checking setting for file '%s' ....", sql_file)
else:
raise ValueError("'sql_file' property missing "
"from an SQL entry.")
# Check for duplicate entries.
if sql_file in sql_files_processed:
raise ValueError(f"File '{sql_file}' is present "
"multiple times.")
else:
sql_files_processed.add(sql_file)
if entry_type == "table":
table_setting = entry_setting.get("table_setting")
validate_table_setting(table_setting)
# Above validation allows for "runtime" frequency, which is not
# allowed for materializer. Let's do one additional check.
load_frequency = table_setting.get("load_frequency")
if load_frequency == "runtime":
raise ValueError("'load_frequency' can not be 'runtime'.")
elif entry_type == "k9_dawg":
k9_id = entry_setting.get("k9_id")
if k9_id:
logger.debug(" Checking setting for local K9 '%s' ....", k9_id)
else:
raise ValueError("'k9_id' property missing from a K9 entry.")
# Check for duplicate entries.
if k9_id in k9s_processed:
raise ValueError(f"K9 '{k9_id}' is present multiple times.")
else:
k9s_processed.add(k9_id)
logger.info("BQ Materializer settings look good.")