def generate_airflow_dags()

in dagify/converter/engine.py [0:0]


def generate_airflow_dags(object, task_name):
    if object.uf is None:
        raise ValueError("dagify: no data in universal format. nothing to convert!")

    for tIdx, dag_divider_value in enumerate(get_dag_dividers(object)):
        airflow_task_outputs = []
        tasks = []
        schedule_interval = None
        for tIdx, task in enumerate(object.uf.get_tasks()):
            # Capture the airflow tasks for each dag divider
            if task.get_attribute(object.dag_divider) == dag_divider_value:
                tasks.append(task.get_attribute(task_name))
                airflow_task_outputs.append(task.get_airflow_task_output())
                if not schedule_interval:
                    schedule_interval = calculate_cron_schedule(task)

        # Calculate DAG Specific Python Imports
        dag_python_imports = object.uf.calculate_dag_python_imports(
            dag_divider_key=object.dag_divider,
            dag_divider_value=dag_divider_value
        )

        # Calculate all internal and external task dependencies
        dependencies = object.uf.generate_dag_dependencies_by_divider(object.dag_divider, task_name)
        dependencies_in_dag_internal = []
        dependencies_in_dag_external = []
        for task in tasks:
            if len(dependencies[dag_divider_value][task]['internal']) > 0:
                dependencies_in_dag_internal.append(object.uf.generate_dag_dependency_statement(task, dependencies[dag_divider_value][task]['internal']))

            for dep in dependencies[dag_divider_value][task]['external']:
                ext_task_uf = object.uf.get_task_by_attr(task_name, dep)
                dependencies_in_dag_external.append({
                    'task_name': task,
                    'ext_dag': ext_task_uf.get_attribute(object.dag_divider),
                    'ext_dep_task': dep,
                    "marker_name": dep + "_marker_" + ''.join(random.choices('0123456789abcdef', k=4))
                })

        # Calculate external upstream dependencies where a task in the current dag depends on another dag's task
        # Such a dependency will require a DAG Sensor
        # The approach that is implemented is to iterate over all external dependencies in the dependencies dictionary and identify the tasks that
        # are also in the current dag.
        upstream_dependencies = []

        for _, divider_tasks in dependencies.items():
            for task, int_ext_deps in divider_tasks.items():
                ext_deps = int_ext_deps["external"]
                for ext_dep in ext_deps:
                    if ext_dep in tasks:
                        ext_task_uf = object.uf.get_task_by_attr(task_name, task)
                        upstream_dag_name = ext_task_uf.get_attribute(object.dag_divider)

                        upstream_dependencies.append({
                            "task_name": ext_dep,
                            "task_in_upstream_dag": task,
                            "upstream_dag_name": upstream_dag_name,
                            "sensor_name": ext_dep + "_sensor_" + ''.join(random.choices('0123456789abcdef', k=4))
                        })

        # Get DAG Template
        environment = Environment(
            loader=FileSystemLoader("./dagify/converter/templates/"))
        template = environment.get_template("dag.tmpl")

        if directory_exists(object.output_path) is False:
            create_directory(object.output_path)

        # Create DAG File by Folder
        filename = f"{object.output_path}/{dag_divider_value}.py"

        content = template.render(
            baseline_imports=get_baseline_imports(object),
            custom_imports=dag_python_imports,
            dag_id=dag_divider_value,
            schedule_interval=schedule_interval,
            tasks=airflow_task_outputs,
            dependencies_int=dependencies_in_dag_internal,
            dependencies_ext=dependencies_in_dag_external,
            upstream_dependencies=upstream_dependencies
        )
        with open(filename, mode="w", encoding="utf-8") as dag_file:
            content_linted = autopep8.fix_code(content)
            dag_file.write(content_linted)

    return