in metaflow/plugins/airflow/airflow.py [0:0]
def compile(self):
if self.flow._flow_decorators.get("trigger") or self.flow._flow_decorators.get(
"trigger_on_finish"
):
raise AirflowException(
"Deploying flows with @trigger or @trigger_on_finish decorator(s) "
"to Airflow is not supported currently."
)
# Visit every node of the flow and recursively build the state machine.
def _visit(node, workflow, exit_node=None):
kube_deco = dict(
[deco for deco in node.decorators if deco.name == "kubernetes"][
0
].attributes
)
if kube_deco:
# Only guard against use_tmpfs and tmpfs_size as these determine if tmpfs is enabled.
for attr in [
"use_tmpfs",
"tmpfs_size",
"persistent_volume_claims",
"image_pull_policy",
]:
if kube_deco[attr]:
raise AirflowException(
"The decorator attribute *%s* is currently not supported on Airflow "
"for the @kubernetes decorator on step *%s*"
% (attr, node.name)
)
parent_is_foreach = any( # Any immediate parent is a foreach node.
self.graph[n].type == "foreach" for n in node.in_funcs
)
state = AirflowTask(
node.name, is_mapper_node=parent_is_foreach
).set_operator_args(**self._to_job(node))
if node.type == "end":
workflow.add_state(state)
# Continue linear assignment within the (sub)workflow if the node
# doesn't branch or fork.
elif node.type in ("start", "linear", "join", "foreach"):
workflow.add_state(state)
_visit(
self.graph[node.out_funcs[0]],
workflow,
)
elif node.type == "split":
workflow.add_state(state)
for func in node.out_funcs:
_visit(
self.graph[func],
workflow,
)
else:
raise AirflowException(
"Node type *%s* for step *%s* "
"is not currently supported by "
"Airflow." % (node.type, node.name)
)
return workflow
# set max active tasks here , For more info check here :
# https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/dag/index.html#airflow.models.dag.DAG
airflow_dag_args = (
{} if self.max_workers is None else dict(max_active_tasks=self.max_workers)
)
airflow_dag_args["is_paused_upon_creation"] = self.is_paused_upon_creation
# workflow timeout should only be enforced if a dag is scheduled.
if self.workflow_timeout is not None and self.schedule is not None:
airflow_dag_args["dagrun_timeout"] = dict(seconds=self.workflow_timeout)
appending_sensors = self._collect_flow_sensors()
workflow = Workflow(
dag_id=self.name,
default_args=self._create_defaults(),
description=self.description,
schedule_interval=self.schedule,
# `start_date` is a mandatory argument even though the documentation lists it as optional value
# Based on the code, Airflow will throw a `AirflowException` when `start_date` is not provided
# to a DAG : https://github.com/apache/airflow/blob/0527a0b6ce506434a23bc2a6f5ddb11f492fc614/airflow/models/dag.py#L2170
start_date=datetime.now(),
tags=self.tags,
file_path=self._file_path,
graph_structure=self.graph_structure,
metadata=dict(
contains_foreach=self.contains_foreach, flow_name=self.flow.name
),
**airflow_dag_args
)
workflow = _visit(self.graph["start"], workflow)
workflow.set_parameters(self.parameters)
if len(appending_sensors) > 0:
for s in appending_sensors:
workflow.add_state(s)
workflow.graph_structure.insert(0, [[s.name] for s in appending_sensors])
return self._to_airflow_dag_file(workflow.to_dict())