def register_dags()

in liminal/runners/airflow/dag/liminal_register_dags.py [0:0]


def register_dags(configs_path):
    """
    Registers pipelines in liminal yml files found in given path (recursively) as airflow DAGs.
    """
    logging.info(f'Registering DAGs from path: {configs_path}')
    config_util = ConfigUtil(configs_path)
    configs = config_util.safe_load(is_render_variables=False)

    if os.getenv('POD_NAMESPACE') != "jenkins":
        config_util.snapshot_final_liminal_configs()

    dags = []
    logging.info(f'found {len(configs)} liminal configs in path: {configs_path}')
    for config in configs:
        name = config['name'] if 'name' in config else None
        try:
            if not name:
                raise ValueError('liminal.yml missing field `name`')

            logging.info(f"Registering DAGs for {name}")

            owner = config.get('owner')

            trigger_rule = 'all_success'
            if 'always_run' in config and config['always_run']:
                trigger_rule = 'all_done'

            executors = __initialize_executors(config)

            default_executor = airflow.AirflowExecutor("default_executor", liminal_config=config, executor_config={})

            for pipeline in config['pipelines']:
                default_args = __default_args(pipeline)
                dag = __initialize_dag(default_args, pipeline, owner)

                parent = None

                for task in pipeline['tasks']:
                    task_type = task['type']
                    task_instance = get_task_class(task_type)(
                        task_id=task['task'],
                        dag=dag,
                        parent=parent,
                        trigger_rule=trigger_rule,
                        liminal_config=config,
                        pipeline_config=pipeline,
                        task_config=task,
                        variables=config.get('variables', {}),
                    )

                    executor_id = task.get('executor')
                    if executor_id:
                        executor = executors[executor_id]
                    else:
                        logging.info(
                            f"Did not find `executor` in ${task['task']} config."
                            f" Using the default executor (${type(default_executor)})"
                            f" instead."
                        )
                        executor = default_executor

                    parent = executor.apply_task_to_dag(task=task_instance)

                logging.info(f'registered DAG {dag.dag_id}: {dag.tasks}')

                dags.append((pipeline['pipeline'], dag))

        except Exception:
            logging.error(f'Failed to register DAGs for {name}')
            traceback.print_exc()

    return dags