in dagify/converter/engine.py [0:0]
def generate_airflow_dags(object, task_name):
if object.uf is None:
raise ValueError("dagify: no data in universal format. nothing to convert!")
for tIdx, dag_divider_value in enumerate(get_dag_dividers(object)):
airflow_task_outputs = []
tasks = []
schedule_interval = None
for tIdx, task in enumerate(object.uf.get_tasks()):
# Capture the airflow tasks for each dag divider
if task.get_attribute(object.dag_divider) == dag_divider_value:
tasks.append(task.get_attribute(task_name))
airflow_task_outputs.append(task.get_airflow_task_output())
if not schedule_interval:
schedule_interval = calculate_cron_schedule(task)
# Calculate DAG Specific Python Imports
dag_python_imports = object.uf.calculate_dag_python_imports(
dag_divider_key=object.dag_divider,
dag_divider_value=dag_divider_value
)
# Calculate all internal and external task dependencies
dependencies = object.uf.generate_dag_dependencies_by_divider(object.dag_divider, task_name)
dependencies_in_dag_internal = []
dependencies_in_dag_external = []
for task in tasks:
if len(dependencies[dag_divider_value][task]['internal']) > 0:
dependencies_in_dag_internal.append(object.uf.generate_dag_dependency_statement(task, dependencies[dag_divider_value][task]['internal']))
for dep in dependencies[dag_divider_value][task]['external']:
ext_task_uf = object.uf.get_task_by_attr(task_name, dep)
dependencies_in_dag_external.append({
'task_name': task,
'ext_dag': ext_task_uf.get_attribute(object.dag_divider),
'ext_dep_task': dep,
"marker_name": dep + "_marker_" + ''.join(random.choices('0123456789abcdef', k=4))
})
# Calculate external upstream dependencies where a task in the current dag depends on another dag's task
# Such a dependency will require a DAG Sensor
# The approach that is implemented is to iterate over all external dependencies in the dependencies dictionary and identify the tasks that
# are also in the current dag.
upstream_dependencies = []
for _, divider_tasks in dependencies.items():
for task, int_ext_deps in divider_tasks.items():
ext_deps = int_ext_deps["external"]
for ext_dep in ext_deps:
if ext_dep in tasks:
ext_task_uf = object.uf.get_task_by_attr(task_name, task)
upstream_dag_name = ext_task_uf.get_attribute(object.dag_divider)
upstream_dependencies.append({
"task_name": ext_dep,
"task_in_upstream_dag": task,
"upstream_dag_name": upstream_dag_name,
"sensor_name": ext_dep + "_sensor_" + ''.join(random.choices('0123456789abcdef', k=4))
})
# Get DAG Template
environment = Environment(
loader=FileSystemLoader("./dagify/converter/templates/"))
template = environment.get_template("dag.tmpl")
if directory_exists(object.output_path) is False:
create_directory(object.output_path)
# Create DAG File by Folder
filename = f"{object.output_path}/{dag_divider_value}.py"
content = template.render(
baseline_imports=get_baseline_imports(object),
custom_imports=dag_python_imports,
dag_id=dag_divider_value,
schedule_interval=schedule_interval,
tasks=airflow_task_outputs,
dependencies_int=dependencies_in_dag_internal,
dependencies_ext=dependencies_in_dag_external,
upstream_dependencies=upstream_dependencies
)
with open(filename, mode="w", encoding="utf-8") as dag_file:
content_linted = autopep8.fix_code(content)
dag_file.write(content_linted)
return