def _to_job()

in metaflow/plugins/airflow/airflow.py [0:0]


    def _to_job(self, node):
        """
        This function will transform the node's specification into Airflow compatible operator arguments.
        Since this function is long, below is the summary of the two major duties it performs:
            1. Based on the type of the graph node (start/linear/foreach/join etc.)
                it will decide how to set the input paths
            2. Based on node's decorator specification convert the information into
                a job spec for the KubernetesPodOperator.
        """
        # Add env vars from the optional @environment decorator.
        env_deco = [deco for deco in node.decorators if deco.name == "environment"]
        env = {}
        if env_deco:
            env = env_deco[0].attributes["vars"].copy()

        # The below if/else block handles "input paths".
        # Input Paths help manage dataflow across the graph.
        if node.name == "start":
            # POSSIBLE_FUTURE_IMPROVEMENT:
            # We can extract metadata about the possible upstream sensor triggers.
            # There is a previous commit (7bdf6) in the `airflow` branch that has `SensorMetaExtractor` class and
            # associated MACRO we have built to handle this case if a metadata regarding the sensor is needed.
            # Initialize parameters for the flow in the `start` step.
            # `start` step has no upstream input dependencies aside from
            # parameters.

            if len(self.parameters):
                env["METAFLOW_PARAMETERS"] = AIRFLOW_MACROS.PARAMETERS
            input_paths = None
        else:
            # If it is not the start node then we check if there are many paths
            # converging into it or a single path. Based on that we set the INPUT_PATHS
            if node.parallel_foreach:
                raise AirflowException(
                    "Parallel steps are not supported yet with Airflow."
                )
            is_foreach_join = (
                node.type == "join"
                and self.graph[node.split_parents[-1]].type == "foreach"
            )
            if is_foreach_join:
                input_paths = self._make_foreach_input_path(node.in_funcs[0])

            elif len(node.in_funcs) == 1:
                # set input paths where this is only one parent node
                # The parent-task-id is passed via the xcom; There is no other way to get that.
                # One key thing about xcoms is that they are immutable and only accepted if the task
                # doesn't fail.
                # From airflow docs :
                # "Note: If the first task run is not succeeded then on every retry task
                # XComs will be cleared to make the task run idempotent."
                input_paths = self._make_input_path(node.in_funcs[0])
            else:
                # this is a split scenario where there can be more than one input paths.
                input_paths = self._compress_input_path(node.in_funcs)

            # env["METAFLOW_INPUT_PATHS"] = input_paths

        env["METAFLOW_CODE_URL"] = self.code_package_url
        env["METAFLOW_FLOW_NAME"] = self.flow.name
        env["METAFLOW_STEP_NAME"] = node.name
        env["METAFLOW_OWNER"] = self.username

        metadata_env = self.metadata.get_runtime_environment("airflow")
        env.update(metadata_env)

        metaflow_version = self.environment.get_environment_info()
        metaflow_version["flow_name"] = self.graph.name
        metaflow_version["production_token"] = self.production_token
        env["METAFLOW_VERSION"] = json.dumps(metaflow_version)

        # Temporary passing of *some* environment variables. Do not rely on this
        # mechanism as it will be removed in the near future
        env.update(
            {
                k: v
                for k, v in config_values()
                if k.startswith("METAFLOW_CONDA_") or k.startswith("METAFLOW_DEBUG_")
            }
        )

        # Extract the k8s decorators for constructing the arguments of the K8s Pod Operator on Airflow.
        k8s_deco = [deco for deco in node.decorators if deco.name == "kubernetes"][0]
        user_code_retries, _ = self._get_retries(node)
        retry_delay = self._get_retry_delay(node)
        # This sets timeouts for @timeout decorators.
        # The timeout is set as "execution_timeout" for an airflow task.
        runtime_limit = get_run_time_limit_for_task(node.decorators)

        k8s = Kubernetes(self.flow_datastore, self.metadata, self.environment)
        user = util.get_username()

        labels = {
            "app": "metaflow",
            "app.kubernetes.io/name": "metaflow-task",
            "app.kubernetes.io/part-of": "metaflow",
            "app.kubernetes.io/created-by": user,
            # Question to (savin) : Should we have username set over here for created by since it is the
            # airflow installation that is creating the jobs.
            # Technically the "user" is the stakeholder but should these labels be present.
        }
        additional_mf_variables = {
            "METAFLOW_CODE_SHA": self.code_package_sha,
            "METAFLOW_CODE_URL": self.code_package_url,
            "METAFLOW_CODE_DS": self.flow_datastore.TYPE,
            "METAFLOW_USER": user,
            "METAFLOW_SERVICE_URL": SERVICE_INTERNAL_URL,
            "METAFLOW_SERVICE_HEADERS": json.dumps(SERVICE_HEADERS),
            "METAFLOW_DATASTORE_SYSROOT_S3": DATASTORE_SYSROOT_S3,
            "METAFLOW_DATATOOLS_S3ROOT": DATATOOLS_S3ROOT,
            "METAFLOW_DEFAULT_DATASTORE": self.flow_datastore.TYPE,
            "METAFLOW_DEFAULT_METADATA": "service",
            "METAFLOW_KUBERNETES_WORKLOAD": str(
                1
            ),  # This is used by kubernetes decorator.
            "METAFLOW_RUNTIME_ENVIRONMENT": "kubernetes",
            "METAFLOW_CARD_S3ROOT": CARD_S3ROOT,
            "METAFLOW_RUN_ID": AIRFLOW_MACROS.RUN_ID,
            "METAFLOW_AIRFLOW_TASK_ID": AIRFLOW_MACROS.create_task_id(
                self.contains_foreach
            ),
            "METAFLOW_AIRFLOW_DAG_RUN_ID": AIRFLOW_MACROS.AIRFLOW_RUN_ID,
            "METAFLOW_AIRFLOW_JOB_ID": AIRFLOW_MACROS.AIRFLOW_JOB_ID,
            "METAFLOW_PRODUCTION_TOKEN": self.production_token,
            "METAFLOW_ATTEMPT_NUMBER": AIRFLOW_MACROS.ATTEMPT,
            # GCP stuff
            "METAFLOW_DATASTORE_SYSROOT_GS": DATASTORE_SYSROOT_GS,
            "METAFLOW_CARD_GSROOT": CARD_GSROOT,
            "METAFLOW_S3_ENDPOINT_URL": S3_ENDPOINT_URL,
        }
        env["METAFLOW_AZURE_STORAGE_BLOB_SERVICE_ENDPOINT"] = (
            AZURE_STORAGE_BLOB_SERVICE_ENDPOINT
        )
        env["METAFLOW_DATASTORE_SYSROOT_AZURE"] = DATASTORE_SYSROOT_AZURE
        env["METAFLOW_CARD_AZUREROOT"] = CARD_AZUREROOT
        if DEFAULT_SECRETS_BACKEND_TYPE:
            env["METAFLOW_DEFAULT_SECRETS_BACKEND_TYPE"] = DEFAULT_SECRETS_BACKEND_TYPE
        if AWS_SECRETS_MANAGER_DEFAULT_REGION:
            env["METAFLOW_AWS_SECRETS_MANAGER_DEFAULT_REGION"] = (
                AWS_SECRETS_MANAGER_DEFAULT_REGION
            )
        if GCP_SECRET_MANAGER_PREFIX:
            env["METAFLOW_GCP_SECRET_MANAGER_PREFIX"] = GCP_SECRET_MANAGER_PREFIX

        if AZURE_KEY_VAULT_PREFIX:
            env["METAFLOW_AZURE_KEY_VAULT_PREFIX"] = AZURE_KEY_VAULT_PREFIX

        env.update(additional_mf_variables)

        service_account = (
            KUBERNETES_SERVICE_ACCOUNT
            if k8s_deco.attributes["service_account"] is None
            else k8s_deco.attributes["service_account"]
        )
        k8s_namespace = (
            k8s_deco.attributes["namespace"]
            if k8s_deco.attributes["namespace"] is not None
            else "default"
        )
        qos_requests, qos_limits = qos_requests_and_limits(
            k8s_deco.attributes["qos"],
            k8s_deco.attributes["cpu"],
            k8s_deco.attributes["memory"],
            k8s_deco.attributes["disk"],
        )
        resources = dict(
            requests=qos_requests,
            limits={
                **qos_limits,
                **{
                    "%s.com/gpu".lower()
                    % k8s_deco.attributes["gpu_vendor"]: str(k8s_deco.attributes["gpu"])
                    for k in [0]
                    # Don't set GPU limits if gpu isn't specified.
                    if k8s_deco.attributes["gpu"] is not None
                },
            },
        )

        annotations = {
            "metaflow/production_token": self.production_token,
            "metaflow/owner": self.username,
            "metaflow/user": self.username,
            "metaflow/flow_name": self.flow.name,
        }
        if current.get("project_name"):
            annotations.update(
                {
                    "metaflow/project_name": current.project_name,
                    "metaflow/branch_name": current.branch_name,
                    "metaflow/project_flow_name": current.project_flow_name,
                }
            )

        k8s_operator_args = dict(
            # like argo workflows we use step_name as name of container
            name=node.name,
            namespace=k8s_namespace,
            service_account_name=service_account,
            node_selector=k8s_deco.attributes["node_selector"],
            cmds=k8s._command(
                self.flow.name,
                AIRFLOW_MACROS.RUN_ID,
                node.name,
                AIRFLOW_MACROS.create_task_id(self.contains_foreach),
                AIRFLOW_MACROS.ATTEMPT,
                code_package_url=self.code_package_url,
                step_cmds=self._step_cli(
                    node, input_paths, self.code_package_url, user_code_retries
                ),
            ),
            annotations=annotations,
            image=k8s_deco.attributes["image"],
            resources=resources,
            execution_timeout=dict(seconds=runtime_limit),
            retries=user_code_retries,
            env_vars=[dict(name=k, value=v) for k, v in env.items() if v is not None],
            labels=labels,
            task_id=node.name,
            startup_timeout_seconds=AIRFLOW_KUBERNETES_STARTUP_TIMEOUT_SECONDS,
            get_logs=True,
            do_xcom_push=True,
            log_events_on_failure=True,
            is_delete_operator_pod=True,
            retry_exponential_backoff=False,  # todo : should this be a arg we allow on CLI. not right now - there is an open ticket for this - maybe at some point we will.
            reattach_on_restart=False,
            secrets=[],
        )
        k8s_operator_args["in_cluster"] = True
        if AIRFLOW_KUBERNETES_CONN_ID is not None:
            k8s_operator_args["kubernetes_conn_id"] = AIRFLOW_KUBERNETES_CONN_ID
            k8s_operator_args["in_cluster"] = False
        if AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT is not None:
            k8s_operator_args["cluster_context"] = AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT
            k8s_operator_args["in_cluster"] = False
        if AIRFLOW_KUBERNETES_KUBECONFIG_FILE is not None:
            k8s_operator_args["config_file"] = AIRFLOW_KUBERNETES_KUBECONFIG_FILE
            k8s_operator_args["in_cluster"] = False

        if k8s_deco.attributes["secrets"]:
            if isinstance(k8s_deco.attributes["secrets"], str):
                k8s_operator_args["secrets"] = k8s_deco.attributes["secrets"].split(",")
            elif isinstance(k8s_deco.attributes["secrets"], list):
                k8s_operator_args["secrets"] = k8s_deco.attributes["secrets"]
        if len(KUBERNETES_SECRETS) > 0:
            k8s_operator_args["secrets"] += KUBERNETES_SECRETS.split(",")

        if retry_delay:
            k8s_operator_args["retry_delay"] = dict(seconds=retry_delay.total_seconds())

        return k8s_operator_args