in metaflow/plugins/argo/argo_workflows.py [0:0]
def _error_msg_capture_hook_templates(self):
from kubernetes import client as kubernetes_sdk
start_step = [step for step in self.graph if step.name == "start"][0]
# We want to grab the base image used by the start step, as this is known to be pullable from within the cluster,
# and it might contain the required libraries, allowing us to start up faster.
resources = dict(
[deco for deco in start_step.decorators if deco.name == "kubernetes"][
0
].attributes
)
run_id_template = "argo-{{workflow.name}}"
metaflow_version = self.environment.get_environment_info()
metaflow_version["flow_name"] = self.graph.name
metaflow_version["production_token"] = self.production_token
mflog_expr = export_mflog_env_vars(
datastore_type=self.flow_datastore.TYPE,
stdout_path="$PWD/.logs/mflog_stdout",
stderr_path="$PWD/.logs/mflog_stderr",
flow_name=self.flow.name,
run_id=run_id_template,
step_name="_run_capture_error",
task_id="1",
retry_count="0",
)
cmds = " && ".join(
[
# For supporting sandboxes, ensure that a custom script is executed
# before anything else is executed. The script is passed in as an
# env var.
'${METAFLOW_INIT_SCRIPT:+eval \\"${METAFLOW_INIT_SCRIPT}\\"}',
"mkdir -p $PWD/.logs",
mflog_expr,
]
+ self.environment.get_package_commands(
self.code_package_url, self.flow_datastore.TYPE
)[:-1]
# Replace the line 'Task in starting'
# FIXME: this can be brittle.
+ ["mflog 'Error capture hook is starting.'"]
+ ["argo_error=$(python -m 'metaflow.plugins.argo.capture_error')"]
+ ["export METAFLOW_ARGO_ERROR=$argo_error"]
+ [
"""python -c 'import json, os; error_obj=os.getenv(\\"METAFLOW_ARGO_ERROR\\");data=json.loads(error_obj); print(data[\\"message\\"])'"""
]
+ [
'if [ -n \\"${METAFLOW_ARGO_WORKFLOWS_CAPTURE_ERROR_SCRIPT}\\" ]; then eval \\"${METAFLOW_ARGO_WORKFLOWS_CAPTURE_ERROR_SCRIPT}\\"; fi'
]
)
# TODO: Also capture the first failed task id
cmds = shlex.split('bash -c "%s"' % cmds)
env = {
# These values are needed by Metaflow to set it's internal
# state appropriately.
"METAFLOW_CODE_URL": self.code_package_url,
"METAFLOW_CODE_SHA": self.code_package_sha,
"METAFLOW_CODE_DS": self.flow_datastore.TYPE,
"METAFLOW_SERVICE_URL": SERVICE_INTERNAL_URL,
"METAFLOW_SERVICE_HEADERS": json.dumps(SERVICE_HEADERS),
"METAFLOW_USER": "argo-workflows",
"METAFLOW_DEFAULT_DATASTORE": self.flow_datastore.TYPE,
"METAFLOW_DEFAULT_METADATA": DEFAULT_METADATA,
"METAFLOW_OWNER": self.username,
}
# support Metaflow sandboxes
env["METAFLOW_INIT_SCRIPT"] = KUBERNETES_SANDBOX_INIT_SCRIPT
env["METAFLOW_ARGO_WORKFLOWS_CAPTURE_ERROR_SCRIPT"] = (
ARGO_WORKFLOWS_CAPTURE_ERROR_SCRIPT
)
env["METAFLOW_WORKFLOW_NAME"] = "{{workflow.name}}"
env["METAFLOW_WORKFLOW_NAMESPACE"] = "{{workflow.namespace}}"
env["METAFLOW_ARGO_WORKFLOW_FAILURES"] = "{{workflow.failures}}"
env = {
k: v
for k, v in env.items()
if v is not None
and k not in set(ARGO_WORKFLOWS_ENV_VARS_TO_SKIP.split(","))
}
return [
Template("error-msg-capture-hook")
.service_account_name(resources["service_account"])
.container(
to_camelcase(
kubernetes_sdk.V1Container(
name="main",
command=cmds,
image=resources["image"],
env=[
kubernetes_sdk.V1EnvVar(name=k, value=str(v))
for k, v in env.items()
],
env_from=[
kubernetes_sdk.V1EnvFromSource(
secret_ref=kubernetes_sdk.V1SecretEnvSource(
name=str(k),
# optional=True
)
)
for k in list(
[]
if not resources.get("secrets")
else (
[resources.get("secrets")]
if isinstance(resources.get("secrets"), str)
else resources.get("secrets")
)
)
+ KUBERNETES_SECRETS.split(",")
+ ARGO_WORKFLOWS_KUBERNETES_SECRETS.split(",")
if k
],
resources=kubernetes_sdk.V1ResourceRequirements(
# NOTE: base resources for this are kept to a minimum to save on running costs.
# This has an adverse effect on startup time for the daemon, which can be completely
# alleviated by using a base image that has the required dependencies pre-installed
requests={
"cpu": "200m",
"memory": "100Mi",
},
limits={
"cpu": "200m",
"memory": "500Mi",
},
),
).to_dict()
)
),
Template("capture-error-hook-fn-preflight").steps(
[
WorkflowStep()
.name("capture-error-hook-fn-preflight")
.template("error-msg-capture-hook")
.when("{{workflow.status}} != Succeeded")
]
),
]