in metaflow/plugins/argo/argo_workflows.py [0:0]
def _heartbeat_daemon_template(self):
# Use all the affordances available to _parameters task
executable = self.environment.executable("_parameters")
run_id = "argo-{{workflow.name}}"
script_name = os.path.basename(sys.argv[0])
entrypoint = [executable, script_name]
# FlowDecorators can define their own top-level options. These might affect run level information
# so it is important to pass these to the heartbeat process as well, as it might be the first task to register a run.
top_opts_dict = {}
for deco in flow_decorators(self.flow):
top_opts_dict.update(deco.get_top_level_options())
top_level = list(dict_to_cli_options(top_opts_dict)) + [
"--quiet",
"--metadata=%s" % self.metadata.TYPE,
"--environment=%s" % self.environment.TYPE,
"--datastore=%s" % self.flow_datastore.TYPE,
"--datastore-root=%s" % self.flow_datastore.datastore_root,
"--event-logger=%s" % self.event_logger.TYPE,
"--monitor=%s" % self.monitor.TYPE,
"--no-pylint",
"--with=argo_workflows_internal:auto-emit-argo-events=%i"
% self.auto_emit_argo_events,
]
heartbeat_cmds = "{entrypoint} {top_level} argo-workflows heartbeat --run_id {run_id} {tags}".format(
entrypoint=" ".join(entrypoint),
top_level=" ".join(top_level) if top_level else "",
run_id=run_id,
tags=" ".join(["--tag %s" % t for t in self.tags]) if self.tags else "",
)
# TODO: we do not really need MFLOG logging for the daemon at the moment, but might be good for the future.
# Consider if we can do without this setup.
# Configure log capture.
mflog_expr = export_mflog_env_vars(
datastore_type=self.flow_datastore.TYPE,
stdout_path="$PWD/.logs/mflog_stdout",
stderr_path="$PWD/.logs/mflog_stderr",
flow_name=self.flow.name,
run_id=run_id,
step_name="_run_heartbeat_daemon",
task_id="1",
retry_count="0",
)
# TODO: Can the init be trimmed down?
# Can we do without get_package_commands fetching the whole code package?
init_cmds = " && ".join(
[
# For supporting sandboxes, ensure that a custom script is executed
# before anything else is executed. The script is passed in as an
# env var.
'${METAFLOW_INIT_SCRIPT:+eval \\"${METAFLOW_INIT_SCRIPT}\\"}',
"mkdir -p $PWD/.logs",
mflog_expr,
]
+ self.environment.get_package_commands(
self.code_package_url, self.flow_datastore.TYPE
)[:-1]
# Replace the line 'Task in starting'
# FIXME: this can be brittle.
+ ["mflog 'Heartbeat daemon is starting.'"]
)
cmd_str = " && ".join([init_cmds, heartbeat_cmds])
cmds = shlex.split('bash -c "%s"' % cmd_str)
# Env required for sending heartbeats to the metadata service, nothing extra.
# prod token / runtime info is required to correctly register flow branches
env = {
# These values are needed by Metaflow to set it's internal
# state appropriately.
"METAFLOW_CODE_URL": self.code_package_url,
"METAFLOW_CODE_SHA": self.code_package_sha,
"METAFLOW_CODE_DS": self.flow_datastore.TYPE,
"METAFLOW_SERVICE_URL": SERVICE_INTERNAL_URL,
"METAFLOW_SERVICE_HEADERS": json.dumps(SERVICE_HEADERS),
"METAFLOW_USER": "argo-workflows",
"METAFLOW_DATASTORE_SYSROOT_S3": DATASTORE_SYSROOT_S3,
"METAFLOW_DATATOOLS_S3ROOT": DATATOOLS_S3ROOT,
"METAFLOW_DEFAULT_DATASTORE": self.flow_datastore.TYPE,
"METAFLOW_DEFAULT_METADATA": DEFAULT_METADATA,
"METAFLOW_CARD_S3ROOT": CARD_S3ROOT,
"METAFLOW_KUBERNETES_WORKLOAD": 1,
"METAFLOW_KUBERNETES_FETCH_EC2_METADATA": KUBERNETES_FETCH_EC2_METADATA,
"METAFLOW_RUNTIME_ENVIRONMENT": "kubernetes",
"METAFLOW_OWNER": self.username,
"METAFLOW_PRODUCTION_TOKEN": self.production_token, # Used in identity resolving. This affects system tags.
}
# support Metaflow sandboxes
env["METAFLOW_INIT_SCRIPT"] = KUBERNETES_SANDBOX_INIT_SCRIPT
# cleanup env values
env = {
k: v
for k, v in env.items()
if v is not None
and k not in set(ARGO_WORKFLOWS_ENV_VARS_TO_SKIP.split(","))
}
# We want to grab the base image used by the start step, as this is known to be pullable from within the cluster,
# and it might contain the required libraries, allowing us to start up faster.
start_step = next(step for step in self.flow if step.name == "start")
resources = dict(
[deco for deco in start_step.decorators if deco.name == "kubernetes"][
0
].attributes
)
from kubernetes import client as kubernetes_sdk
return (
DaemonTemplate("heartbeat-daemon")
# NOTE: Even though a retry strategy does not work for Argo daemon containers,
# this has the side-effect of protecting the exit hooks of the workflow from failing in case the daemon container errors out.
.retry_strategy(10, 1)
.service_account_name(resources["service_account"])
.container(
to_camelcase(
kubernetes_sdk.V1Container(
name="main",
# TODO: Make the image configurable
image=resources["image"],
command=cmds,
env=[
kubernetes_sdk.V1EnvVar(name=k, value=str(v))
for k, v in env.items()
],
env_from=[
kubernetes_sdk.V1EnvFromSource(
secret_ref=kubernetes_sdk.V1SecretEnvSource(
name=str(k),
# optional=True
)
)
for k in list(
[]
if not resources.get("secrets")
else (
[resources.get("secrets")]
if isinstance(resources.get("secrets"), str)
else resources.get("secrets")
)
)
+ KUBERNETES_SECRETS.split(",")
+ ARGO_WORKFLOWS_KUBERNETES_SECRETS.split(",")
if k
],
resources=kubernetes_sdk.V1ResourceRequirements(
# NOTE: base resources for this are kept to a minimum to save on running costs.
# This has an adverse effect on startup time for the daemon, which can be completely
# alleviated by using a base image that has the required dependencies pre-installed
requests={
"cpu": "200m",
"memory": "100Mi",
},
limits={
"cpu": "200m",
"memory": "100Mi",
},
),
)
).to_dict()
)
)