in src/cdc_dag_generator/generate_views.py [0:0]
def process_table(bq_client, table_setting, project_id, raw_dataset,
cdc_dataset):
"""For a given table config, creates required view SQL,
and if the raw table exists, executes the SQL.
"""
base_table = table_setting["base_table"].lower()
raw_table = table_setting["raw_table"]
source_table = project_id + "." + raw_dataset + "." + raw_table
target_view = project_id + "." + cdc_dataset + "." + base_table
raw_exists = table_exists(bq_client, source_table)
if not raw_exists:
logging.error(("Source raw table `%s` doesn't exist! \n"
"CDC view cannot be created."), source_table)
raise SystemExit("⛔️ Failed to deploy CDC views.")
schema_file = Path(_SCHEMA_MAPPING_DIR, f"{base_table}.csv").absolute()
logging.info("__ Processing view '%s' __", base_table)
sfdc_to_bq_field_map: typing.Dict[str, typing.Tuple[str, str]] = {}
# TODO: Check Config File schema.
with open(
schema_file,
encoding="utf-8",
newline="",
) as csv_file:
for row in csv.DictReader(csv_file, delimiter=","):
sfdc_to_bq_field_map[row["SourceField"]] = (row["TargetField"],
row["DataType"])
# SQL file generation
#########################
sql_template_file_name = (_GENERATED_FILE_PREFIX + _TEMPLATE_SQL_NAME +
".sql")
sql_file_name = (base_table.replace(".", "_") +
"_view.sql")
sql_template_file = Path(_SQL_TEMPLATE_DIR, sql_template_file_name)
output_sql_file = Path(_GENERATED_VIEW_SQL_DIR, sql_file_name)
field_assignments = [
f"`{f[0]}` AS `{f[1][0]}`" for f in sfdc_to_bq_field_map.items()
]
sql_subs = {
"source_table": source_table,
"target_view": target_view,
"field_assignments": ",\n ".join(field_assignments)
}
generate_file_from_template(sql_template_file, output_sql_file, **sql_subs)
logging.info("Generated CDC view SQL file.")
try:
if table_exists(bq_client, target_view):
logging.warning(("⚠️ View or table %s already exists. "
"Skipping it."), target_view)
else:
logging.info("Creating view %s", target_view)
execute_sql_file(bq_client, output_sql_file)
logging.info("✅ Created CDC view %s.", target_view)
except Exception as e:
logging.error("Failed to create CDC view '%s'.\n"
"ERROR: %s", target_view, str(e))
raise SystemExit(
"⛔️ Failed to deploy CDC views. Please check the logs.") from e
logging.info("__ View '%s' processed.__", base_table)