def generate_dag_file()

in source/generate_dag.py [0:0]


def generate_dag_file(args):

    config_file = args.config_file
    dag_template = args.dag_template
   
    with open(config_file,'r') as f:
        # Register the tag with the YAML parser
        tmp_config_data = yaml.safe_load(f)
        config_data = yaml.safe_load(reformat_yaml(tmp_config_data))
        config_file_name = os.path.basename(config_file)
        config_data["config_file_name"] = config_file_name
        config_path = os.path.abspath(config_file)
        file_dir = os.path.dirname(os.path.abspath(__file__))
        template_dir = os.path.join(file_dir,"templates")
        dag_id = config_data['dag_id']

        # Reading variables from .py variable file
        dag_variables = import_variables(yaml_config=config_data)
        # Reading python function from .txt file or from YAML config as per configuration 
        python_functions = import_python_functions(yaml_config=config_data)

        # Importing task_dependency from YAML config as per configuration 
        task_dependency = validate_create_task_dependency(yaml_config=config_data)
        
        # Importing variables from variables.YAML or from YAML config as per configuration 
        var_configs = config_data.get("task_variables")

        print("Config file: {}".format(config_path))
        print("Generating DAG for: {}".format(dag_template))

        # Uses template renderer to load and render the Jinja template
        # The template file is selected from config_data['dag_template']
        # variable from the config file that is input to the program.
        env = Environment(
            loader=FileSystemLoader(template_dir),
            lstrip_blocks=True,
        )

        # Consolidate functions in env.globals
        env.globals.update({
            'process_condition': process_condition,
            'raise_exception': raise_exception,
        })

        template = env.get_template(dag_template+".template")
        framework_config_values = {'var_configs': var_configs}

        dag_path = os.path.abspath(os.path.join(os.path.dirname(config_path), '..', "dags"))
        if not os.path.exists(dag_path):
            os.makedirs(dag_path)
            
        generate_file_name = os.path.join(dag_path, dag_id + '.py')
        with open(generate_file_name, 'w') as fh:
            fh.write(
                template.render(
                    config_data=config_data, 
                    framework_config_values=framework_config_values,python_functions=python_functions, 
                    task_dependency=task_dependency,
                    dag_variables=dag_variables,
                )
            )

        print("Finished generating file: {}".format(generate_file_name))