def _error_msg_capture_hook_templates()

in metaflow/plugins/argo/argo_workflows.py [0:0]


    def _error_msg_capture_hook_templates(self):
        from kubernetes import client as kubernetes_sdk

        start_step = [step for step in self.graph if step.name == "start"][0]
        # We want to grab the base image used by the start step, as this is known to be pullable from within the cluster,
        # and it might contain the required libraries, allowing us to start up faster.
        resources = dict(
            [deco for deco in start_step.decorators if deco.name == "kubernetes"][
                0
            ].attributes
        )

        run_id_template = "argo-{{workflow.name}}"
        metaflow_version = self.environment.get_environment_info()
        metaflow_version["flow_name"] = self.graph.name
        metaflow_version["production_token"] = self.production_token

        mflog_expr = export_mflog_env_vars(
            datastore_type=self.flow_datastore.TYPE,
            stdout_path="$PWD/.logs/mflog_stdout",
            stderr_path="$PWD/.logs/mflog_stderr",
            flow_name=self.flow.name,
            run_id=run_id_template,
            step_name="_run_capture_error",
            task_id="1",
            retry_count="0",
        )

        cmds = " && ".join(
            [
                # For supporting sandboxes, ensure that a custom script is executed
                # before anything else is executed. The script is passed in as an
                # env var.
                '${METAFLOW_INIT_SCRIPT:+eval \\"${METAFLOW_INIT_SCRIPT}\\"}',
                "mkdir -p $PWD/.logs",
                mflog_expr,
            ]
            + self.environment.get_package_commands(
                self.code_package_url, self.flow_datastore.TYPE
            )[:-1]
            # Replace the line 'Task in starting'
            # FIXME: this can be brittle.
            + ["mflog 'Error capture hook is starting.'"]
            + ["argo_error=$(python -m 'metaflow.plugins.argo.capture_error')"]
            + ["export METAFLOW_ARGO_ERROR=$argo_error"]
            + [
                """python -c 'import json, os; error_obj=os.getenv(\\"METAFLOW_ARGO_ERROR\\");data=json.loads(error_obj); print(data[\\"message\\"])'"""
            ]
            + [
                'if [ -n \\"${METAFLOW_ARGO_WORKFLOWS_CAPTURE_ERROR_SCRIPT}\\" ]; then eval \\"${METAFLOW_ARGO_WORKFLOWS_CAPTURE_ERROR_SCRIPT}\\"; fi'
            ]
        )

        # TODO: Also capture the first failed task id
        cmds = shlex.split('bash -c "%s"' % cmds)
        env = {
            # These values are needed by Metaflow to set it's internal
            # state appropriately.
            "METAFLOW_CODE_URL": self.code_package_url,
            "METAFLOW_CODE_SHA": self.code_package_sha,
            "METAFLOW_CODE_DS": self.flow_datastore.TYPE,
            "METAFLOW_SERVICE_URL": SERVICE_INTERNAL_URL,
            "METAFLOW_SERVICE_HEADERS": json.dumps(SERVICE_HEADERS),
            "METAFLOW_USER": "argo-workflows",
            "METAFLOW_DEFAULT_DATASTORE": self.flow_datastore.TYPE,
            "METAFLOW_DEFAULT_METADATA": DEFAULT_METADATA,
            "METAFLOW_OWNER": self.username,
        }
        # support Metaflow sandboxes
        env["METAFLOW_INIT_SCRIPT"] = KUBERNETES_SANDBOX_INIT_SCRIPT
        env["METAFLOW_ARGO_WORKFLOWS_CAPTURE_ERROR_SCRIPT"] = (
            ARGO_WORKFLOWS_CAPTURE_ERROR_SCRIPT
        )

        env["METAFLOW_WORKFLOW_NAME"] = "{{workflow.name}}"
        env["METAFLOW_WORKFLOW_NAMESPACE"] = "{{workflow.namespace}}"
        env["METAFLOW_ARGO_WORKFLOW_FAILURES"] = "{{workflow.failures}}"
        env = {
            k: v
            for k, v in env.items()
            if v is not None
            and k not in set(ARGO_WORKFLOWS_ENV_VARS_TO_SKIP.split(","))
        }
        return [
            Template("error-msg-capture-hook")
            .service_account_name(resources["service_account"])
            .container(
                to_camelcase(
                    kubernetes_sdk.V1Container(
                        name="main",
                        command=cmds,
                        image=resources["image"],
                        env=[
                            kubernetes_sdk.V1EnvVar(name=k, value=str(v))
                            for k, v in env.items()
                        ],
                        env_from=[
                            kubernetes_sdk.V1EnvFromSource(
                                secret_ref=kubernetes_sdk.V1SecretEnvSource(
                                    name=str(k),
                                    # optional=True
                                )
                            )
                            for k in list(
                                []
                                if not resources.get("secrets")
                                else (
                                    [resources.get("secrets")]
                                    if isinstance(resources.get("secrets"), str)
                                    else resources.get("secrets")
                                )
                            )
                            + KUBERNETES_SECRETS.split(",")
                            + ARGO_WORKFLOWS_KUBERNETES_SECRETS.split(",")
                            if k
                        ],
                        resources=kubernetes_sdk.V1ResourceRequirements(
                            # NOTE: base resources for this are kept to a minimum to save on running costs.
                            # This has an adverse effect on startup time for the daemon, which can be completely
                            # alleviated by using a base image that has the required dependencies pre-installed
                            requests={
                                "cpu": "200m",
                                "memory": "100Mi",
                            },
                            limits={
                                "cpu": "200m",
                                "memory": "500Mi",
                            },
                        ),
                    ).to_dict()
                )
            ),
            Template("capture-error-hook-fn-preflight").steps(
                [
                    WorkflowStep()
                    .name("capture-error-hook-fn-preflight")
                    .template("error-msg-capture-hook")
                    .when("{{workflow.status}} != Succeeded")
                ]
            ),
        ]