def process_table()

in src/raw_dag_generator/generate_dags.py [0:0]


def process_table(bq_client, table_config, raw_dataset, raw_project):

    api_name = table_config["api_name"]
    base_table = table_config["base_table"].lower()

    logging.info("  Generating files for '%s'", base_table)

    python_template_file = Path(_TEMPLATE_DIR, "airflow_dag_sfdc_to_raw.py")

    output_dag_py_file = Path(
        _GENERATED_DAG_DIR,
        (raw_project + "_" + raw_dataset + "_sfdc_extract_to_raw_"
         + base_table.replace(".", "_") + ".py"))

    today = datetime.datetime.now()
    load_frequency = table_config["load_frequency"]
    subs = {
        "project_id": raw_project,
        "raw_dataset": raw_dataset,
        "base_table": base_table,
        "api_name": api_name,
        "load_frequency": load_frequency,
        "year": today.year,
        "month": today.month,
        "day": today.day
    }

    generate_file_from_template(python_template_file, output_dag_py_file,
                                **subs)

    logging.info("      Generated dag python file")

    raw_table = raw_project + "." + raw_dataset + "." + base_table
    if not table_exists(bq_client, raw_table):
        logging.info(
            "Raw table %s doesn't exists. "
            "Creating one according to the schema mapping.", raw_table)
        schema_file = Path(_THIS_DIR,
                           f"../table_schema/{base_table}.csv").absolute()
        schema_list = []
        has_recordstamp = False
        with open(
                schema_file,
                encoding="utf-8",
                newline="",
        ) as csv_file:
            for row in csv.DictReader(csv_file, delimiter=","):
                source_name = row["SourceField"]
                target_name = row["TargetField"]
                if "recordstamp" in [source_name.lower(), target_name.lower()]:
                    has_recordstamp = True
                schema_list.append((source_name, row["DataType"]))
        # If we handle raw tables, we need Recordstamp field.
        if not has_recordstamp:
            schema_list.append(("Recordstamp", "TIMESTAMP"))
        create_table(bq_client, raw_table, schema_list)