in src/marketing/src/CM360/src/cdc/deploy_cdc_layer.py [0:0]
def main():
parsed_args = _parse_args(sys.argv[1:])
logging.basicConfig(
level=logging.DEBUG if parsed_args.debug else logging.INFO)
logging.info("Deploying CDC layer...")
logging.info(
"\n---------------------------------------\n"
"Using the following parameters from config:\n"
" RAW_PROJECT = %s \n"
" RAW_DATASET = %s \n"
" CDC_PROJECT = %s \n"
" CDC_DATASET = %s \n"
"---------------------------------------\n", RAW_PROJECT, RAW_DATASET,
CDC_PROJECT, CDC_DATASET)
logging.info("Creating required directories for generated files...")
_create_output_dir_structure()
_create_sql_output_dir_structure()
dag_start_date = datetime.now(timezone.utc).date()
client = cortex_bq_client.CortexBQClient(project=CDC_PROJECT,
location=PROJECT_LOCATION)
cdc_layer_settings = SETTINGS["raw_to_cdc_tables"]
logging.info("Processing CDC tables...")
for cdc_table_settings in cdc_layer_settings:
table_name = cdc_table_settings["base_table"]
logging.info("-- Processing table '%s' --", table_name)
table_mapping_path = Path(SCHEMA_DIR, f"{table_name}.csv")
full_table_name = f"{CDC_PROJECT}.{CDC_DATASET}.{table_name}"
if table_exists(bq_client=client, full_table_name=full_table_name):
logging.warning("❗ Table already exists.")
else:
logging.info("Creating table %s...", full_table_name)
logging.info("Creating schema...")
schema = create_bq_schema(table_mapping_path)
# Append account_id in CDC layer.
schema.append(SchemaField(name="account_id", field_type="INTEGER"))
logging.debug("Table schema: %s\n", repr_schema(schema))
partition_details = cdc_table_settings.get("partition_details")
cluster_details = cdc_table_settings.get("cluster_details")
create_table_from_schema(bq_client=client,
full_table_name=full_table_name,
schema=schema,
partition_details=partition_details,
cluster_details=cluster_details)
logging.info("Table %s processed successfully.", full_table_name)
# DAG PY file generation
logging.info("Generating DAG python file...")
load_frequency = cdc_table_settings["load_frequency"]
subs = {
"project_id": CDC_PROJECT,
"cdc_dataset": CDC_DATASET,
"cdc_sql_path": Path("cdc_sql_scripts", f"{table_name}.sql"),
"load_frequency": load_frequency,
"table_name": table_name,
"start_date": dag_start_date,
"runtime_labels_dict": "", # A place holder for labels dict string,
"bq_location": PROJECT_LOCATION
}
# If telemetry opted in, convert CORTEX JOB LABEL dict to string
# And assign to substitution
if client.allow_telemetry:
subs["runtime_labels_dict"] = str(constants.CORTEX_JOB_LABEL)
generate_dag_from_template(
template_file=DAG_TEMPLATE_PATH,
generation_target_directory=OUTPUT_DIR_FOR_CDC,
table_name=table_name,
project_id=CDC_PROJECT,
dataset_id=CDC_DATASET,
layer="raw_to_cdc",
subs=subs)
logging.info("Generated DAG python file.")
# DAG SQL file generation
logging.info("Generating DAG SQL file...")
row_identifiers = cdc_table_settings["row_identifiers"]
sql_template_file = _get_sql_template(table_name=table_name)
template_vals = {
"source_project_id": RAW_PROJECT,
"target_project_id": CDC_PROJECT,
"row_identifiers": row_identifiers,
"source_ds": RAW_DATASET,
"target_ds": CDC_DATASET,
"table": table_name,
}
template_file_path = Path(CDC_SQL_TEMPLATE, sql_template_file)
sql_code = render_template_file(template_path=template_file_path,
mapping_path=table_mapping_path,
subs=template_vals)
generated_sql_path = Path(CDC_SQL_SCRIPTS_OUTPUT_DIR,
f"{table_name}.sql")
write_generated_sql_to_disk(path=generated_sql_path,
generated_sql=sql_code)
logging.info("Generated DAG SQL file: %s", generated_sql_path)
# Populate table with test data using generated SQL script.
if POPULATE_TEST_DATA:
logging.info("Populating table with test data...")
_populate_test_data(client=client,
path_to_script=generated_sql_path,
full_table_name=full_table_name)
logging.info("Test data populated!")
logging.info("Table processed successfully.")
logging.info("----------------------------")
logging.info("Processed all tables successfully.")
logging.info("Copying DAG config file...")
shutil.copyfile(src=DAG_CONFIG_INI_INPUT_PATH,
dst=DAG_CONFIG_INI_OUTPUT_PATH)
logging.info("DAG config file copied successfully.")
logging.info("✅ CDC layer deployed successfully!")