def __initialize_dag()

in liminal/runners/airflow/dag/liminal_register_dags.py [0:0]


def __initialize_dag(default_args, pipeline, owner):
    pipeline_name = pipeline['pipeline']

    schedule_interval = default_args.get('schedule_interval', None)
    if not schedule_interval:
        schedule_interval = default_args.get('schedule', None)

    if owner and 'owner' not in default_args:
        default_args['owner'] = owner

    start_date = pipeline.get('start_date', datetime.min.time())
    if not isinstance(start_date, datetime):
        start_date = datetime.combine(start_date, datetime.min.time())

    default_args.pop('tasks', None)
    default_args.pop('schedule', None)
    default_args.pop('monitoring', None)
    default_args.pop('schedule_interval', None)

    dag = DAG(
        dag_id=pipeline_name,
        default_args=default_args,
        dagrun_timeout=timedelta(minutes=pipeline['timeout_minutes']),
        start_date=start_date,
        schedule_interval=schedule_interval,
        catchup=False,
    )

    return dag