in dagify/converter/uf.py [0:0]
def generate_dag_dependencies_by_divider(self, dag_divider, task_name):
"""
the following structure for dependencies will be created by this method:
dependencies = {
div1: {
task1: {
internal: [task2, task3, ...],
external: [task4, task5, ...]
task2: {
internal: [task1, task3, ...],
external: [task5, task6, ...]
}
},
div2: {
task1: {
internal: [task2, task3, ...],
external: [task4, task5, ...]
task2: {
internal: [task1, task3, ...],
external: [task5, task6, ...]
}
}
}
"""
dependencies = {}
dag_divider_values = set(task.get_attribute(dag_divider) for task in self.get_tasks())
for tIdx, dag_divider_value in enumerate(dag_divider_values):
dependencies.setdefault(dag_divider_value, {})
deps = []
tasks = []
for tIdx, task in enumerate(self.get_tasks()):
current_task_name = task.get_attribute(task_name)
dependencies.setdefault(dag_divider_value, {}).setdefault(current_task_name, {"internal": [], "external": []})
# Capture the airflow tasks for each dag divider
if task.get_attribute(dag_divider) == dag_divider_value:
tasks.append(task.get_airflow_task_output())
deps = task.get_dependent_tasks()
if len(deps) > 0:
# ======== Internal DAG Dependencies ======== #
for dep in deps:
if dep.get("dag_name") == dag_divider_value:
print(dep.get("task_name"))
current_deps_for_divider = dependencies.get(dag_divider_value, {current_task_name: {"internal": [], "external": []}})
current_internal_deps_for_task_in_divider = current_deps_for_divider[current_task_name]["internal"]
current_internal_deps_for_task_in_divider.append(dep.get("task_name"))
dependencies[dag_divider_value][current_task_name]["internal"] = current_internal_deps_for_task_in_divider
# ======== External DAG Dependencies ======== #
for dep in deps:
if dep.get("dag_name") != dag_divider_value:
print(dep.get("task_name"))
current_deps_for_divider = dependencies.get(dag_divider_value, {current_task_name: {"internal": [], "external": []}})
current_external_deps_for_task_in_divider = current_deps_for_divider[current_task_name]["external"]
current_external_deps_for_task_in_divider.append(dep.get("task_name"))
dependencies[dag_divider_value][current_task_name]["external"] = current_external_deps_for_task_in_divider
return dependencies