in source/generate_dag.py [0:0]
def generate_dag_file(args):
config_file = args.config_file
dag_template = args.dag_template
with open(config_file,'r') as f:
# Register the tag with the YAML parser
tmp_config_data = yaml.safe_load(f)
config_data = yaml.safe_load(reformat_yaml(tmp_config_data))
config_file_name = os.path.basename(config_file)
config_data["config_file_name"] = config_file_name
config_path = os.path.abspath(config_file)
file_dir = os.path.dirname(os.path.abspath(__file__))
template_dir = os.path.join(file_dir,"templates")
dag_id = config_data['dag_id']
# Reading variables from .py variable file
dag_variables = import_variables(yaml_config=config_data)
# Reading python function from .txt file or from YAML config as per configuration
python_functions = import_python_functions(yaml_config=config_data)
# Importing task_dependency from YAML config as per configuration
task_dependency = validate_create_task_dependency(yaml_config=config_data)
# Importing variables from variables.YAML or from YAML config as per configuration
var_configs = config_data.get("task_variables")
print("Config file: {}".format(config_path))
print("Generating DAG for: {}".format(dag_template))
# Uses template renderer to load and render the Jinja template
# The template file is selected from config_data['dag_template']
# variable from the config file that is input to the program.
env = Environment(
loader=FileSystemLoader(template_dir),
lstrip_blocks=True,
)
# Consolidate functions in env.globals
env.globals.update({
'process_condition': process_condition,
'raise_exception': raise_exception,
})
template = env.get_template(dag_template+".template")
framework_config_values = {'var_configs': var_configs}
dag_path = os.path.abspath(os.path.join(os.path.dirname(config_path), '..', "dags"))
if not os.path.exists(dag_path):
os.makedirs(dag_path)
generate_file_name = os.path.join(dag_path, dag_id + '.py')
with open(generate_file_name, 'w') as fh:
fh.write(
template.render(
config_data=config_data,
framework_config_values=framework_config_values,python_functions=python_functions,
task_dependency=task_dependency,
dag_variables=dag_variables,
)
)
print("Finished generating file: {}".format(generate_file_name))