in liminal/runners/airflow/dag/liminal_register_dags.py [0:0]
def __initialize_dag(default_args, pipeline, owner):
pipeline_name = pipeline['pipeline']
schedule_interval = default_args.get('schedule_interval', None)
if not schedule_interval:
schedule_interval = default_args.get('schedule', None)
if owner and 'owner' not in default_args:
default_args['owner'] = owner
start_date = pipeline.get('start_date', datetime.min.time())
if not isinstance(start_date, datetime):
start_date = datetime.combine(start_date, datetime.min.time())
default_args.pop('tasks', None)
default_args.pop('schedule', None)
default_args.pop('monitoring', None)
default_args.pop('schedule_interval', None)
dag = DAG(
dag_id=pipeline_name,
default_args=default_args,
dagrun_timeout=timedelta(minutes=pipeline['timeout_minutes']),
start_date=start_date,
schedule_interval=schedule_interval,
catchup=False,
)
return dag