def _compile_workflow_template()

in metaflow/plugins/argo/argo_workflows.py [0:0]


    def _compile_workflow_template(self):
        # This method compiles a Metaflow FlowSpec into Argo WorkflowTemplate
        #
        # WorkflowTemplate
        #   |
        #    -- WorkflowSpec
        #         |
        #          -- Array<Template>
        #                     |
        #                      -- DAGTemplate, ContainerTemplate
        #                           |                  |
        #                            -- Array<DAGTask> |
        #                                       |      |
        #                                        -- Template
        #
        # Steps in FlowSpec are represented as DAGTasks.
        # A DAGTask can reference to -
        #     a ContainerTemplate (for linear steps..) or
        #     another DAGTemplate (for nested `foreach`s).
        #
        # While we could have very well inlined container templates inside a DAGTask,
        # unfortunately Argo variable substitution ({{pod.name}}) doesn't work as
        # expected within DAGTasks
        # (https://github.com/argoproj/argo-workflows/issues/7432) and we are forced to
        # generate container templates at the top level (in WorkflowSpec) and maintain
        # references to them within the DAGTask.

        annotations = {}
        if self._schedule is not None:
            # timezone is an optional field and json dumps on None will result in null
            # hence configuring it to an empty string
            if self._timezone is None:
                self._timezone = ""
            cron_info = {"schedule": self._schedule, "tz": self._timezone}
            annotations.update({"metaflow/cron": json.dumps(cron_info)})

        if self.parameters:
            annotations.update({"metaflow/parameters": json.dumps(self.parameters)})

        # Some more annotations to populate the Argo UI nicely
        if self.tags:
            annotations.update({"metaflow/tags": json.dumps(self.tags)})
        if self.triggers:
            annotations.update(
                {
                    "metaflow/triggers": json.dumps(
                        [
                            {key: trigger.get(key) for key in ["name", "type"]}
                            for trigger in self.triggers
                        ]
                    )
                }
            )
        if self.notify_on_error:
            annotations.update(
                {
                    "metaflow/notify_on_error": json.dumps(
                        {
                            "slack": bool(self.notify_slack_webhook_url),
                            "pager_duty": bool(self.notify_pager_duty_integration_key),
                            "incident_io": bool(self.notify_incident_io_api_key),
                        }
                    )
                }
            )
        if self.notify_on_success:
            annotations.update(
                {
                    "metaflow/notify_on_success": json.dumps(
                        {
                            "slack": bool(self.notify_slack_webhook_url),
                            "pager_duty": bool(self.notify_pager_duty_integration_key),
                            "incident_io": bool(self.notify_incident_io_api_key),
                        }
                    )
                }
            )
        try:
            # Build the DAG based on the DAGNodes given by the FlowGraph for the found FlowSpec class.
            _steps_info, graph_structure = self.graph.output_steps()
            graph_info = {
                # for the time being, we only need the graph_structure. Being mindful of annotation size limits we do not include anything extra.
                "graph_structure": graph_structure
            }
        except Exception:
            graph_info = None

        dag_annotation = {"metaflow/dag": json.dumps(graph_info)}

        return (
            WorkflowTemplate()
            .metadata(
                # Workflow Template metadata.
                ObjectMeta()
                .name(self.name)
                # Argo currently only supports Workflow-level namespaces. When v3.4.0
                # is released, we should be able to support multi-namespace /
                # multi-cluster scheduling.
                .namespace(KUBERNETES_NAMESPACE)
                .annotations(annotations)
                .annotations(self._base_annotations)
                .labels(self._base_labels)
                .label("app.kubernetes.io/name", "metaflow-flow")
                .annotations(dag_annotation)
            )
            .spec(
                WorkflowSpec()
                # Set overall workflow timeout.
                .active_deadline_seconds(self.workflow_timeout)
                # TODO: Allow Argo to optionally archive all workflow execution logs
                #       It's disabled for now since it requires all Argo installations
                #       to enable an artifactory repository. If log archival is
                #       enabled in workflow controller, the logs for this workflow will
                #       automatically get archived.
                # .archive_logs()
                # Don't automount service tokens for now - https://github.com/kubernetes/kubernetes/issues/16779#issuecomment-159656641
                # TODO: Service account names are currently set in the templates. We
                #       can specify the default service account name here to reduce
                #       the size of the generated YAML by a tiny bit.
                # .automount_service_account_token()
                # TODO: Support ImagePullSecrets for Argo & Kubernetes
                #       Not strictly needed since a very valid workaround exists
                #       https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account/#add-imagepullsecrets-to-a-service-account
                # .image_pull_secrets(...)
                # Limit workflow parallelism
                .parallelism(self.max_workers)
                # TODO: Support Prometheus metrics for Argo
                # .metrics(...)
                # TODO: Support PodGC and DisruptionBudgets
                .priority(self.workflow_priority)
                # Set workflow metadata
                .workflow_metadata(
                    Metadata()
                    .labels(self._base_labels)
                    .label("app.kubernetes.io/name", "metaflow-run")
                    .annotations(
                        {
                            **annotations,
                            **self._base_annotations,
                            **{"metaflow/run_id": "argo-{{workflow.name}}"},
                        }
                    )
                    # TODO: Set dynamic labels using labels_from. Ideally, we would
                    #       want to expose run_id as a label. It's easy to add labels,
                    #       but very difficult to remove them - let's err on the
                    #       conservative side and only add labels when we come across
                    #       use-cases for them.
                )
                # Handle parameters
                .arguments(
                    Arguments().parameters(
                        [
                            Parameter(parameter["name"])
                            .value(
                                "'%s'" % parameter["value"]
                                if parameter["type"] == "JSON"
                                else parameter["value"]
                            )
                            .description(parameter.get("description"))
                            # TODO: Better handle IncludeFile in Argo Workflows UI.
                            for parameter in self.parameters.values()
                        ]
                        + [
                            # Introduce non-required parameters for argo events so
                            # that the entire event payload can be accessed within the
                            # run. The parameter name is hashed to ensure that
                            # there won't be any collisions with Metaflow parameters.
                            Parameter(event["sanitized_name"])
                            .value(json.dumps(None))  # None in Argo Workflows world.
                            .description("auto-set by metaflow. safe to ignore.")
                            for event in self.triggers
                        ]
                    )
                )
                # Set common pod metadata.
                .pod_metadata(
                    Metadata()
                    .labels(self._base_labels)
                    .label("app.kubernetes.io/name", "metaflow-task")
                    .annotations(
                        {
                            **annotations,
                            **self._base_annotations,
                            **{
                                "metaflow/run_id": "argo-{{workflow.name}}"
                            },  # we want pods of the workflow to have the run_id as an annotation as well
                        }
                    )
                )
                # Set the entrypoint to flow name
                .entrypoint(self.flow.name)
                # OnExit hooks
                .onExit(
                    "capture-error-hook-fn-preflight"
                    if self.enable_error_msg_capture
                    else None
                )
                # Set exit hook handlers if notifications are enabled
                .hooks(
                    {
                        **(
                            {
                                # workflow status maps to Completed
                                "notify-slack-on-success": LifecycleHook()
                                .expression("workflow.status == 'Succeeded'")
                                .template("notify-slack-on-success"),
                            }
                            if self.notify_on_success and self.notify_slack_webhook_url
                            else {}
                        ),
                        **(
                            {
                                # workflow status maps to Completed
                                "notify-pager-duty-on-success": LifecycleHook()
                                .expression("workflow.status == 'Succeeded'")
                                .template("notify-pager-duty-on-success"),
                            }
                            if self.notify_on_success
                            and self.notify_pager_duty_integration_key
                            else {}
                        ),
                        **(
                            {
                                # workflow status maps to Completed
                                "notify-incident-io-on-success": LifecycleHook()
                                .expression("workflow.status == 'Succeeded'")
                                .template("notify-incident-io-on-success"),
                            }
                            if self.notify_on_success
                            and self.notify_incident_io_api_key
                            else {}
                        ),
                        **(
                            {
                                # workflow status maps to Failed or Error
                                "notify-slack-on-failure": LifecycleHook()
                                .expression("workflow.status == 'Failed'")
                                .template("notify-slack-on-error"),
                                "notify-slack-on-error": LifecycleHook()
                                .expression("workflow.status == 'Error'")
                                .template("notify-slack-on-error"),
                            }
                            if self.notify_on_error and self.notify_slack_webhook_url
                            else {}
                        ),
                        **(
                            {
                                # workflow status maps to Failed or Error
                                "notify-pager-duty-on-failure": LifecycleHook()
                                .expression("workflow.status == 'Failed'")
                                .template("notify-pager-duty-on-error"),
                                "notify-pager-duty-on-error": LifecycleHook()
                                .expression("workflow.status == 'Error'")
                                .template("notify-pager-duty-on-error"),
                            }
                            if self.notify_on_error
                            and self.notify_pager_duty_integration_key
                            else {}
                        ),
                        **(
                            {
                                # workflow status maps to Failed or Error
                                "notify-incident-io-on-failure": LifecycleHook()
                                .expression("workflow.status == 'Failed'")
                                .template("notify-incident-io-on-error"),
                                "notify-incident-io-on-error": LifecycleHook()
                                .expression("workflow.status == 'Error'")
                                .template("notify-incident-io-on-error"),
                            }
                            if self.notify_on_error and self.notify_incident_io_api_key
                            else {}
                        ),
                        # Warning: terrible hack to workaround a bug in Argo Workflow
                        #          where the hooks listed above do not execute unless
                        #          there is an explicit exit hook. as and when this
                        #          bug is patched, we should remove this effectively
                        #          no-op hook.
                        **(
                            {"exit": LifecycleHook().template("exit-hook-hack")}
                            if self.notify_on_error or self.notify_on_success
                            else {}
                        ),
                    }
                )
                # Top-level DAG template(s)
                .templates(self._dag_templates())
                # Container templates
                .templates(self._container_templates())
                # Exit hook template(s)
                .templates(self._exit_hook_templates())
                # Sidecar templates (Daemon Containers)
                .templates(self._daemon_templates())
            )
        )