in dagify/converter/uf.py [0:0]
def calculate_dag_python_imports(self, dag_divider_key="", dag_divider_value=""):
python_imports = []
dag_imps = {}
for task in self.get_tasks():
if task.get_attribute(dag_divider_key) == dag_divider_value or dag_divider_key == "":
for task_import in task.get_airflow_task_python_imports():
if dag_imps.get(task_import['package'], None) is not None:
existing_imports = dag_imps.get(task_import['package'], None)
for new_imp in task_import['imports']:
if new_imp not in existing_imports:
dag_imps[task_import['package']].append(new_imp)
else:
dag_imps[task_import['package']] = task_import['imports']
# Sort the Import List
dag_imps[task_import['package']].sort()
# Sort the Modules
dag_imps = dict(sorted(dag_imps.items()))
# Process to Pythonic Statements
for package, imports_list in dag_imps.items():
imports = ', '.join(imports_list)
python_imports.append(f"from {package} import {imports}")
# Set the Python Imports for the DAG
return python_imports