in src/common/py_libs/cdc.py [0:0]
def create_cdc_table_from_raw_table(bq_client, table_setting, raw_project,
raw_dataset, cdc_project, cdc_dataset):
"""Creates CDC table based on source RAW table schema.
Retrieves schema details from source table in RAW dataset and creates a
table in CDC dataset based on that schema if it does not exist.
Args:
bq_client: BQ Client.
table_setting: Table config as defined in the settings file.
raw_project: BQ Raw project.
raw_dataset: BQ Raw dataset name.
cdc_project: BQ CDC project.
cdc_dataset: BQ CDC dataset name.
"""
base_table = table_setting["base_table"].lower()
raw_table_name = raw_project + "." + raw_dataset + "." + base_table
cdc_table_name = cdc_project + "." + cdc_dataset + "." + base_table
try:
_ = bq_client.get_table(cdc_table_name)
logger.warning("Table '%s' already exists. Not creating it again.",
cdc_table_name)
except NotFound:
# Let's create CDC table.
logger.info("Table '%s' does not exists. Creating it.", cdc_table_name)
try:
raw_table_schema = bq_client.get_table(raw_table_name).schema
except NotFound:
e_msg = (f"RAW Table '{raw_table_name}' does not exist.")
raise Exception(e_msg) from None
target_schema = [
field for field in raw_table_schema
if field.name not in _CDC_EXCLUDED_COLUMN_LIST
]
cdc_table = bigquery.Table(cdc_table_name, schema=target_schema)
# Add clustering and partitioning properties if specified.
partition_details = table_setting.get("partition_details")
if partition_details:
cdc_table = add_partition_to_table_def(cdc_table, partition_details)
cluster_details = table_setting.get("cluster_details")
if cluster_details:
cdc_table = add_cluster_to_table_def(cdc_table, cluster_details)
bq_client.create_table(cdc_table)
logger.info("Created table '%s'.", cdc_table_name)