def _heartbeat_daemon_template()

in metaflow/plugins/argo/argo_workflows.py [0:0]


    def _heartbeat_daemon_template(self):
        # Use all the affordances available to _parameters task
        executable = self.environment.executable("_parameters")
        run_id = "argo-{{workflow.name}}"
        script_name = os.path.basename(sys.argv[0])
        entrypoint = [executable, script_name]
        # FlowDecorators can define their own top-level options. These might affect run level information
        # so it is important to pass these to the heartbeat process as well, as it might be the first task to register a run.
        top_opts_dict = {}
        for deco in flow_decorators(self.flow):
            top_opts_dict.update(deco.get_top_level_options())

        top_level = list(dict_to_cli_options(top_opts_dict)) + [
            "--quiet",
            "--metadata=%s" % self.metadata.TYPE,
            "--environment=%s" % self.environment.TYPE,
            "--datastore=%s" % self.flow_datastore.TYPE,
            "--datastore-root=%s" % self.flow_datastore.datastore_root,
            "--event-logger=%s" % self.event_logger.TYPE,
            "--monitor=%s" % self.monitor.TYPE,
            "--no-pylint",
            "--with=argo_workflows_internal:auto-emit-argo-events=%i"
            % self.auto_emit_argo_events,
        ]
        heartbeat_cmds = "{entrypoint} {top_level} argo-workflows heartbeat --run_id {run_id} {tags}".format(
            entrypoint=" ".join(entrypoint),
            top_level=" ".join(top_level) if top_level else "",
            run_id=run_id,
            tags=" ".join(["--tag %s" % t for t in self.tags]) if self.tags else "",
        )

        # TODO: we do not really need MFLOG logging for the daemon at the moment, but might be good for the future.
        # Consider if we can do without this setup.
        # Configure log capture.
        mflog_expr = export_mflog_env_vars(
            datastore_type=self.flow_datastore.TYPE,
            stdout_path="$PWD/.logs/mflog_stdout",
            stderr_path="$PWD/.logs/mflog_stderr",
            flow_name=self.flow.name,
            run_id=run_id,
            step_name="_run_heartbeat_daemon",
            task_id="1",
            retry_count="0",
        )
        # TODO: Can the init be trimmed down?
        # Can we do without get_package_commands fetching the whole code package?
        init_cmds = " && ".join(
            [
                # For supporting sandboxes, ensure that a custom script is executed
                # before anything else is executed. The script is passed in as an
                # env var.
                '${METAFLOW_INIT_SCRIPT:+eval \\"${METAFLOW_INIT_SCRIPT}\\"}',
                "mkdir -p $PWD/.logs",
                mflog_expr,
            ]
            + self.environment.get_package_commands(
                self.code_package_url, self.flow_datastore.TYPE
            )[:-1]
            # Replace the line 'Task in starting'
            # FIXME: this can be brittle.
            + ["mflog 'Heartbeat daemon is starting.'"]
        )

        cmd_str = " && ".join([init_cmds, heartbeat_cmds])
        cmds = shlex.split('bash -c "%s"' % cmd_str)

        # Env required for sending heartbeats to the metadata service, nothing extra.
        # prod token / runtime info is required to correctly register flow branches
        env = {
            # These values are needed by Metaflow to set it's internal
            # state appropriately.
            "METAFLOW_CODE_URL": self.code_package_url,
            "METAFLOW_CODE_SHA": self.code_package_sha,
            "METAFLOW_CODE_DS": self.flow_datastore.TYPE,
            "METAFLOW_SERVICE_URL": SERVICE_INTERNAL_URL,
            "METAFLOW_SERVICE_HEADERS": json.dumps(SERVICE_HEADERS),
            "METAFLOW_USER": "argo-workflows",
            "METAFLOW_DATASTORE_SYSROOT_S3": DATASTORE_SYSROOT_S3,
            "METAFLOW_DATATOOLS_S3ROOT": DATATOOLS_S3ROOT,
            "METAFLOW_DEFAULT_DATASTORE": self.flow_datastore.TYPE,
            "METAFLOW_DEFAULT_METADATA": DEFAULT_METADATA,
            "METAFLOW_CARD_S3ROOT": CARD_S3ROOT,
            "METAFLOW_KUBERNETES_WORKLOAD": 1,
            "METAFLOW_KUBERNETES_FETCH_EC2_METADATA": KUBERNETES_FETCH_EC2_METADATA,
            "METAFLOW_RUNTIME_ENVIRONMENT": "kubernetes",
            "METAFLOW_OWNER": self.username,
            "METAFLOW_PRODUCTION_TOKEN": self.production_token,  # Used in identity resolving. This affects system tags.
        }
        # support Metaflow sandboxes
        env["METAFLOW_INIT_SCRIPT"] = KUBERNETES_SANDBOX_INIT_SCRIPT

        # cleanup env values
        env = {
            k: v
            for k, v in env.items()
            if v is not None
            and k not in set(ARGO_WORKFLOWS_ENV_VARS_TO_SKIP.split(","))
        }

        # We want to grab the base image used by the start step, as this is known to be pullable from within the cluster,
        # and it might contain the required libraries, allowing us to start up faster.
        start_step = next(step for step in self.flow if step.name == "start")
        resources = dict(
            [deco for deco in start_step.decorators if deco.name == "kubernetes"][
                0
            ].attributes
        )
        from kubernetes import client as kubernetes_sdk

        return (
            DaemonTemplate("heartbeat-daemon")
            # NOTE: Even though a retry strategy does not work for Argo daemon containers,
            # this has the side-effect of protecting the exit hooks of the workflow from failing in case the daemon container errors out.
            .retry_strategy(10, 1)
            .service_account_name(resources["service_account"])
            .container(
                to_camelcase(
                    kubernetes_sdk.V1Container(
                        name="main",
                        # TODO: Make the image configurable
                        image=resources["image"],
                        command=cmds,
                        env=[
                            kubernetes_sdk.V1EnvVar(name=k, value=str(v))
                            for k, v in env.items()
                        ],
                        env_from=[
                            kubernetes_sdk.V1EnvFromSource(
                                secret_ref=kubernetes_sdk.V1SecretEnvSource(
                                    name=str(k),
                                    # optional=True
                                )
                            )
                            for k in list(
                                []
                                if not resources.get("secrets")
                                else (
                                    [resources.get("secrets")]
                                    if isinstance(resources.get("secrets"), str)
                                    else resources.get("secrets")
                                )
                            )
                            + KUBERNETES_SECRETS.split(",")
                            + ARGO_WORKFLOWS_KUBERNETES_SECRETS.split(",")
                            if k
                        ],
                        resources=kubernetes_sdk.V1ResourceRequirements(
                            # NOTE: base resources for this are kept to a minimum to save on running costs.
                            # This has an adverse effect on startup time for the daemon, which can be completely
                            # alleviated by using a base image that has the required dependencies pre-installed
                            requests={
                                "cpu": "200m",
                                "memory": "100Mi",
                            },
                            limits={
                                "cpu": "200m",
                                "memory": "100Mi",
                            },
                        ),
                    )
                ).to_dict()
            )
        )