in metaflow/plugins/airflow/airflow_utils.py [0:0]
def compile(self):
from airflow import DAG
# Airflow 2.0.0 cannot import this, so we have to do it this way.
# `XComArg` is needed for dynamic task mapping and if the airflow installation is of the right
# version (+2.3.0) then the class will be importable.
XComArg = get_xcom_arg_class()
_validate_minimum_airflow_version()
if self._metadata["contains_foreach"]:
_validate_dynamic_mapping_compatibility()
# We need to verify if KubernetesPodOperator is of version > 4.2.0 to support foreachs / dynamic task mapping.
# If the dag uses dynamic Task mapping then we throw an error since the `resources` argument in the `KubernetesPodOperator`
# doesn't work for dynamic task mapping for `KubernetesPodOperator` version < 4.2.0.
# For more context check this issue : https://github.com/apache/airflow/issues/24669
_check_foreach_compatible_kubernetes_provider()
params_dict = self._construct_params()
# DAG Params can be seen here :
# https://airflow.apache.org/docs/apache-airflow/2.0.0/_api/airflow/models/dag/index.html#airflow.models.dag.DAG
# Airflow 2.0.0 Allows setting Params.
dag = DAG(params=params_dict, **self._dag_instantiation_params.arguments)
dag.fileloc = self._file_path if self._file_path is not None else dag.fileloc
def add_node(node, parents, dag):
"""
A recursive function to traverse the specialized
graph_structure datastructure.
"""
if type(node) == str:
task = self.states[node].to_task()
if parents:
for parent in parents:
# Handle foreach nodes.
if self.states[node].is_mapper_node:
task = task.expand(mapper_arr=XComArg(parent))
parent >> task
return [task] # Return Parent
# this means a split from parent
if type(node) == list:
# this means branching since everything within the list is a list
if all(isinstance(n, list) for n in node):
curr_parents = parents
parent_list = []
for node_list in node:
last_parent = add_node(node_list, curr_parents, dag)
parent_list.extend(last_parent)
return parent_list
else:
# this means no branching and everything within the list is not a list and can be actual nodes.
curr_parents = parents
for node_x in node:
curr_parents = add_node(node_x, curr_parents, dag)
return curr_parents
with dag:
parent = None
for node in self.graph_structure:
parent = add_node(node, parent, dag)
return dag