def generate_dag_dependencies_by_divider()

in dagify/converter/uf.py [0:0]


    def generate_dag_dependencies_by_divider(self, dag_divider, task_name):
        """
        the following structure for dependencies will be created by this method:
        dependencies = {
            div1: {
                task1: {
                    internal: [task2, task3, ...],
                    external: [task4, task5, ...]
                task2: {
                    internal: [task1, task3, ...],
                    external: [task5, task6, ...]
                }
            },
            div2: {
                task1: {
                    internal: [task2, task3, ...],
                    external: [task4, task5, ...]
                task2: {
                    internal: [task1, task3, ...],
                    external: [task5, task6, ...]
                }
            }
        }
        """

        dependencies = {}

        dag_divider_values = set(task.get_attribute(dag_divider) for task in self.get_tasks())

        for tIdx, dag_divider_value in enumerate(dag_divider_values):
            dependencies.setdefault(dag_divider_value, {})
            deps = []
            tasks = []
            for tIdx, task in enumerate(self.get_tasks()):
                current_task_name = task.get_attribute(task_name)
                dependencies.setdefault(dag_divider_value, {}).setdefault(current_task_name, {"internal": [], "external": []})

                # Capture the airflow tasks for each dag divider
                if task.get_attribute(dag_divider) == dag_divider_value:
                    tasks.append(task.get_airflow_task_output())

                    deps = task.get_dependent_tasks()
                    if len(deps) > 0:
                        # ======== Internal DAG Dependencies ======== #
                        for dep in deps:
                            if dep.get("dag_name") == dag_divider_value:
                                print(dep.get("task_name"))
                                current_deps_for_divider = dependencies.get(dag_divider_value, {current_task_name: {"internal": [], "external": []}})
                                current_internal_deps_for_task_in_divider = current_deps_for_divider[current_task_name]["internal"]

                                current_internal_deps_for_task_in_divider.append(dep.get("task_name"))
                                dependencies[dag_divider_value][current_task_name]["internal"] = current_internal_deps_for_task_in_divider

                        # ======== External DAG Dependencies ======== #
                        for dep in deps:
                            if dep.get("dag_name") != dag_divider_value:
                                print(dep.get("task_name"))
                                current_deps_for_divider = dependencies.get(dag_divider_value, {current_task_name: {"internal": [], "external": []}})
                                current_external_deps_for_task_in_divider = current_deps_for_divider[current_task_name]["external"]
                                current_external_deps_for_task_in_divider.append(dep.get("task_name"))
                                dependencies[dag_divider_value][current_task_name]["external"] = current_external_deps_for_task_in_divider

        return dependencies