def compile()

in metaflow/plugins/airflow/airflow_utils.py [0:0]


    def compile(self):
        from airflow import DAG

        # Airflow 2.0.0 cannot import this, so we have to do it this way.
        # `XComArg` is needed for dynamic task mapping and if the airflow installation is of the right
        # version (+2.3.0) then the class will be importable.
        XComArg = get_xcom_arg_class()

        _validate_minimum_airflow_version()

        if self._metadata["contains_foreach"]:
            _validate_dynamic_mapping_compatibility()
            # We need to verify if KubernetesPodOperator is of version > 4.2.0 to support foreachs / dynamic task mapping.
            # If the dag uses dynamic Task mapping then we throw an error since the `resources` argument in the `KubernetesPodOperator`
            # doesn't work for dynamic task mapping for `KubernetesPodOperator` version < 4.2.0.
            # For more context check this issue :  https://github.com/apache/airflow/issues/24669
            _check_foreach_compatible_kubernetes_provider()

        params_dict = self._construct_params()
        # DAG Params can be seen here :
        # https://airflow.apache.org/docs/apache-airflow/2.0.0/_api/airflow/models/dag/index.html#airflow.models.dag.DAG
        # Airflow 2.0.0 Allows setting Params.
        dag = DAG(params=params_dict, **self._dag_instantiation_params.arguments)
        dag.fileloc = self._file_path if self._file_path is not None else dag.fileloc

        def add_node(node, parents, dag):
            """
            A recursive function to traverse the specialized
            graph_structure datastructure.
            """
            if type(node) == str:
                task = self.states[node].to_task()
                if parents:
                    for parent in parents:
                        # Handle foreach nodes.
                        if self.states[node].is_mapper_node:
                            task = task.expand(mapper_arr=XComArg(parent))
                        parent >> task
                return [task]  # Return Parent

            # this means a split from parent
            if type(node) == list:
                # this means branching since everything within the list is a list
                if all(isinstance(n, list) for n in node):
                    curr_parents = parents
                    parent_list = []
                    for node_list in node:
                        last_parent = add_node(node_list, curr_parents, dag)
                        parent_list.extend(last_parent)
                    return parent_list
                else:
                    # this means no branching and everything within the list is not a list and can be actual nodes.
                    curr_parents = parents
                    for node_x in node:
                        curr_parents = add_node(node_x, curr_parents, dag)
                    return curr_parents

        with dag:
            parent = None
            for node in self.graph_structure:
                parent = add_node(node, parent, dag)

        return dag