def generate_dag_files()

in common/materializer/generate_assets.py [0:0]


def generate_dag_files(module_name: str, target_dataset_type: str,
                       target_dataset: str, table_name: str,
                       table_setting: dict, table_refresh_sql: str,
                       allow_telemetry: bool, location: str, template_dir: Path,
                       generated_dag_dir: Path) -> None:
    """Generates necessary DAG files to refresh a given table.

    There are two files to be generated:
    1. Python file - this is the main DAG file, and is generated using a
       template.
    2. BigQuery SQL file that the DAG needs to execute to refresh a table.

    Naming schema:
       Dag Name :
         <project>_<dataset>_refresh_<table>
       Dag Full Name (shown in Airflow UI):
         <module>_<dataset_type>_<dag_name>
       Output Directory:
         <dag_dir>/<module>/<dataset_type>/<dag_name>.py
       Python file:
         <output_directory>/<dag_name>.py
       SQL file:
         <output_directory>/sql_scripts/<dag_name>.sql

       e.g.
          dag_dir/cm360/reporting/
              project1_dataset1_refresh_clicks.py
              project1_dataset1_refresh_impressions.py
              sql_scripts/
                  project1_dataset1_refresh_clicks.sql
                  project1_dataset1_refresh_impressions.sql

    Args:
        module_name: Name of module (e.g. "cm360", "sap")
        target_dataset_type: Type of dataset - e.g. "reporting" or "cdc".
        target_dataset: Bigquery dataset including GCP project id.
            e.g. "my_project.my_dataset".
        table_name: Table name to refresh. (e.g. "CustomerMD")
        table_setting: Table Settings as defined in the settings file.
        table_refresh_sql: SQL with logic to populate data in the table.
        allow_telemetry: Bool from Cortex config file to specify if
            telemetry is allowed.
        location: Location to pass to BigQueryInsertJob operators in DAGs.
        template_dir: directory where python dag template is stored.
        generated_dag_dir: directory where generated dag will be materialized.
    """
    dag_name = "_".join(
        [target_dataset.replace(".", "_"), "refresh", table_name])
    dag_full_name = "_".join(
        [module_name.lower(), target_dataset_type, dag_name])

    # Directory to store generated files - e.g. "dag_dir/cm360/reporting/"
    output_dir = Path(generated_dag_dir, module_name.lower(),
                      target_dataset_type)

    # Generate sql file.
    sql_file = Path("sql_scripts", dag_name).with_suffix(".sql")
    output_sql_file = Path(output_dir, sql_file)
    output_sql_file.parent.mkdir(exist_ok=True, parents=True)
    with output_sql_file.open(mode="w+", encoding="utf-8") as sqlf:
        sqlf.write(table_refresh_sql)
    logging.info("Generated DAG SQL file : %s", output_sql_file)

    # Generate python DAG file.
    python_dag_template_file = Path(template_dir,
                                    "airflow_dag_template_reporting.py")
    output_py_file = Path(output_dir, dag_name).with_suffix(".py")

    today = datetime.datetime.now()
    load_frequency = table_setting["load_frequency"]
    # TODO: Figure out a way to do lowercase in string template substitution
    # directly.
    py_subs = {
        "dag_full_name": dag_full_name,
        "lower_module_name": module_name.lower(),
        "lower_tgt_dataset_type": target_dataset_type,
        "query_file": str(sql_file),
        "load_frequency": load_frequency,
        "year": today.year,
        "month": today.month,
        "day": today.day,
        "runtime_labels_dict": "",  # A place holder for label dict string,
        "bq_location": location
    }

    # Add bq_labels to py_subs dict if telemetry allowed
    # Converts CORTEX_JOB_LABEL to str for substitution purposes
    if allow_telemetry:
        py_subs["runtime_labels_dict"] = str(constants.CORTEX_JOB_LABEL)

    if target_dataset_type == "reporting":
        py_subs["tags"] = [module_name.lower(), "reporting"]

    dag_generator.generate_file_from_template(python_dag_template_file,
                                              output_py_file, **py_subs)

    logging.info("Generated dag python file: %s", output_py_file)