def process_table()

in src/SFDC/src/cdc_dag_generator/generate_dags.py [0:0]


def process_table(bq_client, table_setting, project_id, raw_dataset,
                  cdc_dataset, load_test_data, location):
    """For a given table config, creates required tables as well as
    dag and related files. """

    base_table = table_setting["base_table"].lower()
    raw_table = table_setting["raw_table"]
    api_name = table_setting["api_name"]

    schema_file = Path(_THIS_DIR,
                       f"../table_schema/{base_table}.csv").absolute()

    logging.info("__ Processing table '%s' __", base_table)

    sfdc_to_bq_field_map: typing.Dict[str, typing.Tuple[str, str]] = {}

    # TODO: Check Config File schema.
    with open(
            schema_file,
            encoding="utf-8",
            newline="",
    ) as csv_file:
        for row in csv.DictReader(csv_file, delimiter=","):
            sfdc_to_bq_field_map[row["SourceField"]] = (row["TargetField"],
                                                        row["DataType"])
    source_fields_lower = [f.lower() for f in sfdc_to_bq_field_map]
    target_fields_lower = [f[0].lower() for f in sfdc_to_bq_field_map.values()]

    # Making sure important fields are in the destination.
    # Work on Id field first.
    if "id" not in source_fields_lower:
        # No Id field in Raw. Trying to one via the target name
        # which is supposed to be {api_name}Id
        id_name = f"{api_name}id".lower()
        if id_name in target_fields_lower:
            id_index = target_fields_lower.index(id_name)
        else:
            raise ValueError(
                (f"Cannot find the Id field for {base_table}. "
                 f"It must be mapped to {api_name}Id field in CDC."))
    else:
        id_index = source_fields_lower.index("id")
    source_id_field_name = list(sfdc_to_bq_field_map.keys())[id_index]
    id_field_name = sfdc_to_bq_field_map[source_id_field_name][0]
    # Work on SystemModstamp.
    if "systemmodstamp" not in source_fields_lower:
        if "systemmodstamp" in target_fields_lower:
            sms_index = target_fields_lower.index("systemmodstamp")
        else:
            raise ValueError(
                (f"Cannot find the SystemModstamp field for {base_table}. "
                 "It must be mapped to SystemModstamp field in CDC."))
    else:
        sms_index = source_fields_lower.index("systemmodstamp")
    source_sms_field_name = list(sfdc_to_bq_field_map.keys())[sms_index]
    sms_field_name = sfdc_to_bq_field_map[source_sms_field_name][0]

    # Create CDC table if needed.
    #############################
    try:
        create_cdc_table(bq_client, table_setting, project_id, cdc_dataset,
                         list(sfdc_to_bq_field_map.values()))
    except Exception as e:
        logging.error("Failed while processing table '%s'.\n"
                      "ERROR: %s", base_table, str(e))
        raise SystemExit(
            "⛔️ Failed to deploy CDC. Please check the logs.") from e

    generated_file_prefix = project_id + "_" + cdc_dataset + "_raw_to_cdc_"

    # Python file generation
    #########################
    python_template_file = Path(_TEMPLATE_DIR, "airflow_dag_raw_to_cdc.py")
    output_py_file_name = (generated_file_prefix +
                           base_table.replace(".", "_") + ".py")
    output_py_file = Path(_GENERATED_DAG_DIR, output_py_file_name)

    today = datetime.datetime.now()
    load_frequency = table_setting["load_frequency"]

    py_subs = {
        "project_id": project_id,
        "raw_dataset": raw_dataset,
        "cdc_dataset": cdc_dataset,
        "base_table": base_table,
        "load_frequency": load_frequency,
        "year": today.year,
        "month": today.month,
        "day": today.day,
        "runtime_labels_dict": "", # A place holder for label dict
        "location": location
    }

    # Add bq_labels to py_subs dict if telemetry is allowed
    # Convert CORTEX_JOB_LABEL dict to str for substitution purposes
    if bq_client.allow_telemetry:
        py_subs["runtime_labels_dict"] = str(constants.CORTEX_JOB_LABEL)

    generate_file_from_template(python_template_file, output_py_file, **py_subs)

    logging.info("Generated dag python files")

    # SQL file generation
    #########################
    sql_template_file_name = (_TEMPLATE_FILE_PREFIX + _TEMPLATE_SQL_NAME +
                              ".sql")
    sql_file_name = (base_table.replace(".", "_") +
                     ".sql")
    sql_template_file = Path(_SQL_TEMPLATE_DIR, sql_template_file_name)
    output_sql_file = Path(_GENERATED_DAG_SQL_DIR, sql_file_name)

    field_assignments = [
        f"`{f[0]}` AS `{f[1][0]}`" for f in sfdc_to_bq_field_map.items()
    ]
    target_fields = [f"`{f[0]}`" for f in sfdc_to_bq_field_map.values()]

    sql_subs = {
        "source_table": project_id + "." + raw_dataset + "." + raw_table,
        "target_table": project_id + "." + cdc_dataset + "." + base_table,
        "source_id": source_id_field_name,
        "target_id": id_field_name,
        "source_systemmodstamp": source_sms_field_name,
        "target_systemmodstamp": sms_field_name,
        "target_fields": ",\n  ".join(target_fields),
        "field_assignments": ",\n  ".join(field_assignments)
    }

    generate_file_from_template(sql_template_file, output_sql_file, **sql_subs)
    logging.info("Generated DAG SQL file.")

    # If test data is needed, we want to populate the CDC table from data in
    # the RAW tables. Let's use the DAG SQL file to do that.
    if str(load_test_data).lower() == "true":
        try:
            execute_sql_file(bq_client, output_sql_file)
            logging.info("Populated CDC table with test data.")
        except Exception as e:
            logging.error("Failed to populate CDC table '%s'.\n"
                          "ERROR: %s", (cdc_dataset + "." + base_table), str(e))
            raise SystemExit(
                "⛔️ Failed to deploy CDC. Please check the logs.") from e

    logging.info("__ Table '%s' processed.__", base_table)