def compile()

in metaflow/plugins/airflow/airflow.py [0:0]


    def compile(self):
        if self.flow._flow_decorators.get("trigger") or self.flow._flow_decorators.get(
            "trigger_on_finish"
        ):
            raise AirflowException(
                "Deploying flows with @trigger or @trigger_on_finish decorator(s) "
                "to Airflow is not supported currently."
            )

        # Visit every node of the flow and recursively build the state machine.
        def _visit(node, workflow, exit_node=None):
            kube_deco = dict(
                [deco for deco in node.decorators if deco.name == "kubernetes"][
                    0
                ].attributes
            )
            if kube_deco:
                # Only guard against use_tmpfs and tmpfs_size as these determine if tmpfs is enabled.
                for attr in [
                    "use_tmpfs",
                    "tmpfs_size",
                    "persistent_volume_claims",
                    "image_pull_policy",
                ]:
                    if kube_deco[attr]:
                        raise AirflowException(
                            "The decorator attribute *%s* is currently not supported on Airflow "
                            "for the @kubernetes decorator on step *%s*"
                            % (attr, node.name)
                        )

            parent_is_foreach = any(  # Any immediate parent is a foreach node.
                self.graph[n].type == "foreach" for n in node.in_funcs
            )
            state = AirflowTask(
                node.name, is_mapper_node=parent_is_foreach
            ).set_operator_args(**self._to_job(node))
            if node.type == "end":
                workflow.add_state(state)

            # Continue linear assignment within the (sub)workflow if the node
            # doesn't branch or fork.
            elif node.type in ("start", "linear", "join", "foreach"):
                workflow.add_state(state)
                _visit(
                    self.graph[node.out_funcs[0]],
                    workflow,
                )

            elif node.type == "split":
                workflow.add_state(state)
                for func in node.out_funcs:
                    _visit(
                        self.graph[func],
                        workflow,
                    )
            else:
                raise AirflowException(
                    "Node type *%s* for  step *%s* "
                    "is not currently supported by "
                    "Airflow." % (node.type, node.name)
                )

            return workflow

        # set max active tasks here , For more info check here :
        # https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/dag/index.html#airflow.models.dag.DAG
        airflow_dag_args = (
            {} if self.max_workers is None else dict(max_active_tasks=self.max_workers)
        )
        airflow_dag_args["is_paused_upon_creation"] = self.is_paused_upon_creation

        # workflow timeout should only be enforced if a dag is scheduled.
        if self.workflow_timeout is not None and self.schedule is not None:
            airflow_dag_args["dagrun_timeout"] = dict(seconds=self.workflow_timeout)

        appending_sensors = self._collect_flow_sensors()
        workflow = Workflow(
            dag_id=self.name,
            default_args=self._create_defaults(),
            description=self.description,
            schedule_interval=self.schedule,
            # `start_date` is a mandatory argument even though the documentation lists it as optional value
            # Based on the code, Airflow will throw a `AirflowException` when `start_date` is not provided
            # to a DAG : https://github.com/apache/airflow/blob/0527a0b6ce506434a23bc2a6f5ddb11f492fc614/airflow/models/dag.py#L2170
            start_date=datetime.now(),
            tags=self.tags,
            file_path=self._file_path,
            graph_structure=self.graph_structure,
            metadata=dict(
                contains_foreach=self.contains_foreach, flow_name=self.flow.name
            ),
            **airflow_dag_args
        )
        workflow = _visit(self.graph["start"], workflow)

        workflow.set_parameters(self.parameters)
        if len(appending_sensors) > 0:
            for s in appending_sensors:
                workflow.add_state(s)
            workflow.graph_structure.insert(0, [[s.name] for s in appending_sensors])
        return self._to_airflow_dag_file(workflow.to_dict())