in metaflow/plugins/airflow/airflow.py [0:0]
def _to_job(self, node):
"""
This function will transform the node's specification into Airflow compatible operator arguments.
Since this function is long, below is the summary of the two major duties it performs:
1. Based on the type of the graph node (start/linear/foreach/join etc.)
it will decide how to set the input paths
2. Based on node's decorator specification convert the information into
a job spec for the KubernetesPodOperator.
"""
# Add env vars from the optional @environment decorator.
env_deco = [deco for deco in node.decorators if deco.name == "environment"]
env = {}
if env_deco:
env = env_deco[0].attributes["vars"].copy()
# The below if/else block handles "input paths".
# Input Paths help manage dataflow across the graph.
if node.name == "start":
# POSSIBLE_FUTURE_IMPROVEMENT:
# We can extract metadata about the possible upstream sensor triggers.
# There is a previous commit (7bdf6) in the `airflow` branch that has `SensorMetaExtractor` class and
# associated MACRO we have built to handle this case if a metadata regarding the sensor is needed.
# Initialize parameters for the flow in the `start` step.
# `start` step has no upstream input dependencies aside from
# parameters.
if len(self.parameters):
env["METAFLOW_PARAMETERS"] = AIRFLOW_MACROS.PARAMETERS
input_paths = None
else:
# If it is not the start node then we check if there are many paths
# converging into it or a single path. Based on that we set the INPUT_PATHS
if node.parallel_foreach:
raise AirflowException(
"Parallel steps are not supported yet with Airflow."
)
is_foreach_join = (
node.type == "join"
and self.graph[node.split_parents[-1]].type == "foreach"
)
if is_foreach_join:
input_paths = self._make_foreach_input_path(node.in_funcs[0])
elif len(node.in_funcs) == 1:
# set input paths where this is only one parent node
# The parent-task-id is passed via the xcom; There is no other way to get that.
# One key thing about xcoms is that they are immutable and only accepted if the task
# doesn't fail.
# From airflow docs :
# "Note: If the first task run is not succeeded then on every retry task
# XComs will be cleared to make the task run idempotent."
input_paths = self._make_input_path(node.in_funcs[0])
else:
# this is a split scenario where there can be more than one input paths.
input_paths = self._compress_input_path(node.in_funcs)
# env["METAFLOW_INPUT_PATHS"] = input_paths
env["METAFLOW_CODE_URL"] = self.code_package_url
env["METAFLOW_FLOW_NAME"] = self.flow.name
env["METAFLOW_STEP_NAME"] = node.name
env["METAFLOW_OWNER"] = self.username
metadata_env = self.metadata.get_runtime_environment("airflow")
env.update(metadata_env)
metaflow_version = self.environment.get_environment_info()
metaflow_version["flow_name"] = self.graph.name
metaflow_version["production_token"] = self.production_token
env["METAFLOW_VERSION"] = json.dumps(metaflow_version)
# Temporary passing of *some* environment variables. Do not rely on this
# mechanism as it will be removed in the near future
env.update(
{
k: v
for k, v in config_values()
if k.startswith("METAFLOW_CONDA_") or k.startswith("METAFLOW_DEBUG_")
}
)
# Extract the k8s decorators for constructing the arguments of the K8s Pod Operator on Airflow.
k8s_deco = [deco for deco in node.decorators if deco.name == "kubernetes"][0]
user_code_retries, _ = self._get_retries(node)
retry_delay = self._get_retry_delay(node)
# This sets timeouts for @timeout decorators.
# The timeout is set as "execution_timeout" for an airflow task.
runtime_limit = get_run_time_limit_for_task(node.decorators)
k8s = Kubernetes(self.flow_datastore, self.metadata, self.environment)
user = util.get_username()
labels = {
"app": "metaflow",
"app.kubernetes.io/name": "metaflow-task",
"app.kubernetes.io/part-of": "metaflow",
"app.kubernetes.io/created-by": user,
# Question to (savin) : Should we have username set over here for created by since it is the
# airflow installation that is creating the jobs.
# Technically the "user" is the stakeholder but should these labels be present.
}
additional_mf_variables = {
"METAFLOW_CODE_SHA": self.code_package_sha,
"METAFLOW_CODE_URL": self.code_package_url,
"METAFLOW_CODE_DS": self.flow_datastore.TYPE,
"METAFLOW_USER": user,
"METAFLOW_SERVICE_URL": SERVICE_INTERNAL_URL,
"METAFLOW_SERVICE_HEADERS": json.dumps(SERVICE_HEADERS),
"METAFLOW_DATASTORE_SYSROOT_S3": DATASTORE_SYSROOT_S3,
"METAFLOW_DATATOOLS_S3ROOT": DATATOOLS_S3ROOT,
"METAFLOW_DEFAULT_DATASTORE": self.flow_datastore.TYPE,
"METAFLOW_DEFAULT_METADATA": "service",
"METAFLOW_KUBERNETES_WORKLOAD": str(
1
), # This is used by kubernetes decorator.
"METAFLOW_RUNTIME_ENVIRONMENT": "kubernetes",
"METAFLOW_CARD_S3ROOT": CARD_S3ROOT,
"METAFLOW_RUN_ID": AIRFLOW_MACROS.RUN_ID,
"METAFLOW_AIRFLOW_TASK_ID": AIRFLOW_MACROS.create_task_id(
self.contains_foreach
),
"METAFLOW_AIRFLOW_DAG_RUN_ID": AIRFLOW_MACROS.AIRFLOW_RUN_ID,
"METAFLOW_AIRFLOW_JOB_ID": AIRFLOW_MACROS.AIRFLOW_JOB_ID,
"METAFLOW_PRODUCTION_TOKEN": self.production_token,
"METAFLOW_ATTEMPT_NUMBER": AIRFLOW_MACROS.ATTEMPT,
# GCP stuff
"METAFLOW_DATASTORE_SYSROOT_GS": DATASTORE_SYSROOT_GS,
"METAFLOW_CARD_GSROOT": CARD_GSROOT,
"METAFLOW_S3_ENDPOINT_URL": S3_ENDPOINT_URL,
}
env["METAFLOW_AZURE_STORAGE_BLOB_SERVICE_ENDPOINT"] = (
AZURE_STORAGE_BLOB_SERVICE_ENDPOINT
)
env["METAFLOW_DATASTORE_SYSROOT_AZURE"] = DATASTORE_SYSROOT_AZURE
env["METAFLOW_CARD_AZUREROOT"] = CARD_AZUREROOT
if DEFAULT_SECRETS_BACKEND_TYPE:
env["METAFLOW_DEFAULT_SECRETS_BACKEND_TYPE"] = DEFAULT_SECRETS_BACKEND_TYPE
if AWS_SECRETS_MANAGER_DEFAULT_REGION:
env["METAFLOW_AWS_SECRETS_MANAGER_DEFAULT_REGION"] = (
AWS_SECRETS_MANAGER_DEFAULT_REGION
)
if GCP_SECRET_MANAGER_PREFIX:
env["METAFLOW_GCP_SECRET_MANAGER_PREFIX"] = GCP_SECRET_MANAGER_PREFIX
if AZURE_KEY_VAULT_PREFIX:
env["METAFLOW_AZURE_KEY_VAULT_PREFIX"] = AZURE_KEY_VAULT_PREFIX
env.update(additional_mf_variables)
service_account = (
KUBERNETES_SERVICE_ACCOUNT
if k8s_deco.attributes["service_account"] is None
else k8s_deco.attributes["service_account"]
)
k8s_namespace = (
k8s_deco.attributes["namespace"]
if k8s_deco.attributes["namespace"] is not None
else "default"
)
qos_requests, qos_limits = qos_requests_and_limits(
k8s_deco.attributes["qos"],
k8s_deco.attributes["cpu"],
k8s_deco.attributes["memory"],
k8s_deco.attributes["disk"],
)
resources = dict(
requests=qos_requests,
limits={
**qos_limits,
**{
"%s.com/gpu".lower()
% k8s_deco.attributes["gpu_vendor"]: str(k8s_deco.attributes["gpu"])
for k in [0]
# Don't set GPU limits if gpu isn't specified.
if k8s_deco.attributes["gpu"] is not None
},
},
)
annotations = {
"metaflow/production_token": self.production_token,
"metaflow/owner": self.username,
"metaflow/user": self.username,
"metaflow/flow_name": self.flow.name,
}
if current.get("project_name"):
annotations.update(
{
"metaflow/project_name": current.project_name,
"metaflow/branch_name": current.branch_name,
"metaflow/project_flow_name": current.project_flow_name,
}
)
k8s_operator_args = dict(
# like argo workflows we use step_name as name of container
name=node.name,
namespace=k8s_namespace,
service_account_name=service_account,
node_selector=k8s_deco.attributes["node_selector"],
cmds=k8s._command(
self.flow.name,
AIRFLOW_MACROS.RUN_ID,
node.name,
AIRFLOW_MACROS.create_task_id(self.contains_foreach),
AIRFLOW_MACROS.ATTEMPT,
code_package_url=self.code_package_url,
step_cmds=self._step_cli(
node, input_paths, self.code_package_url, user_code_retries
),
),
annotations=annotations,
image=k8s_deco.attributes["image"],
resources=resources,
execution_timeout=dict(seconds=runtime_limit),
retries=user_code_retries,
env_vars=[dict(name=k, value=v) for k, v in env.items() if v is not None],
labels=labels,
task_id=node.name,
startup_timeout_seconds=AIRFLOW_KUBERNETES_STARTUP_TIMEOUT_SECONDS,
get_logs=True,
do_xcom_push=True,
log_events_on_failure=True,
is_delete_operator_pod=True,
retry_exponential_backoff=False, # todo : should this be a arg we allow on CLI. not right now - there is an open ticket for this - maybe at some point we will.
reattach_on_restart=False,
secrets=[],
)
k8s_operator_args["in_cluster"] = True
if AIRFLOW_KUBERNETES_CONN_ID is not None:
k8s_operator_args["kubernetes_conn_id"] = AIRFLOW_KUBERNETES_CONN_ID
k8s_operator_args["in_cluster"] = False
if AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT is not None:
k8s_operator_args["cluster_context"] = AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT
k8s_operator_args["in_cluster"] = False
if AIRFLOW_KUBERNETES_KUBECONFIG_FILE is not None:
k8s_operator_args["config_file"] = AIRFLOW_KUBERNETES_KUBECONFIG_FILE
k8s_operator_args["in_cluster"] = False
if k8s_deco.attributes["secrets"]:
if isinstance(k8s_deco.attributes["secrets"], str):
k8s_operator_args["secrets"] = k8s_deco.attributes["secrets"].split(",")
elif isinstance(k8s_deco.attributes["secrets"], list):
k8s_operator_args["secrets"] = k8s_deco.attributes["secrets"]
if len(KUBERNETES_SECRETS) > 0:
k8s_operator_args["secrets"] += KUBERNETES_SECRETS.split(",")
if retry_delay:
k8s_operator_args["retry_delay"] = dict(seconds=retry_delay.total_seconds())
return k8s_operator_args