in src/config_reader.py [0:0]
def process_table(table_config: dict, source_dataset: str, target_dataset: str,
gen_test: str, allow_telemetry: bool,
bq_location: str) -> None:
try:
table_name = table_config.get("base_table")
raw_table = source_dataset + "." + table_name
logging.info("== Processing table %s ==", raw_table)
if "target_table" in table_config:
target_table = table_config["target_table"]
else:
target_table = table_name
cdc_table = target_dataset + "." + target_table
partition_details = table_config.get("partition_details")
cluster_details = table_config.get("cluster_details")
load_frequency = table_config.get("load_frequency")
if load_frequency == "RUNTIME":
generate_runtime_view(raw_table, cdc_table)
else:
create_cdc_table(raw_table, cdc_table, partition_details,
cluster_details)
# Create files (python and sql) that will be used later to
# create DAG in GCP that will refresh CDC tables from RAW
# tables.
logging.info("Generating required files for DAG with %s ",
cdc_table)
generate_cdc_dag_files(raw_table, cdc_table, load_frequency,
gen_test, allow_telemetry, bq_location)
logging.info("✅ == Processed %s ==", raw_table)
except Exception as e:
raise SystemExit(
"⛔️ Error while generating sql and dags. Please check the logs. "
f"Error = {e} ⛔️"
) from e