in liminal/runners/airflow/dag/liminal_register_dags.py [0:0]
def register_dags(configs_path):
"""
Registers pipelines in liminal yml files found in given path (recursively) as airflow DAGs.
"""
logging.info(f'Registering DAGs from path: {configs_path}')
config_util = ConfigUtil(configs_path)
configs = config_util.safe_load(is_render_variables=False)
if os.getenv('POD_NAMESPACE') != "jenkins":
config_util.snapshot_final_liminal_configs()
dags = []
logging.info(f'found {len(configs)} liminal configs in path: {configs_path}')
for config in configs:
name = config['name'] if 'name' in config else None
try:
if not name:
raise ValueError('liminal.yml missing field `name`')
logging.info(f"Registering DAGs for {name}")
owner = config.get('owner')
trigger_rule = 'all_success'
if 'always_run' in config and config['always_run']:
trigger_rule = 'all_done'
executors = __initialize_executors(config)
default_executor = airflow.AirflowExecutor("default_executor", liminal_config=config, executor_config={})
for pipeline in config['pipelines']:
default_args = __default_args(pipeline)
dag = __initialize_dag(default_args, pipeline, owner)
parent = None
for task in pipeline['tasks']:
task_type = task['type']
task_instance = get_task_class(task_type)(
task_id=task['task'],
dag=dag,
parent=parent,
trigger_rule=trigger_rule,
liminal_config=config,
pipeline_config=pipeline,
task_config=task,
variables=config.get('variables', {}),
)
executor_id = task.get('executor')
if executor_id:
executor = executors[executor_id]
else:
logging.info(
f"Did not find `executor` in ${task['task']} config."
f" Using the default executor (${type(default_executor)})"
f" instead."
)
executor = default_executor
parent = executor.apply_task_to_dag(task=task_instance)
logging.info(f'registered DAG {dag.dag_id}: {dag.tasks}')
dags.append((pipeline['pipeline'], dag))
except Exception:
logging.error(f'Failed to register DAGs for {name}')
traceback.print_exc()
return dags