in metaflow/plugins/argo/argo_workflows.py [0:0]
def _compile_workflow_template(self):
# This method compiles a Metaflow FlowSpec into Argo WorkflowTemplate
#
# WorkflowTemplate
# |
# -- WorkflowSpec
# |
# -- Array<Template>
# |
# -- DAGTemplate, ContainerTemplate
# | |
# -- Array<DAGTask> |
# | |
# -- Template
#
# Steps in FlowSpec are represented as DAGTasks.
# A DAGTask can reference to -
# a ContainerTemplate (for linear steps..) or
# another DAGTemplate (for nested `foreach`s).
#
# While we could have very well inlined container templates inside a DAGTask,
# unfortunately Argo variable substitution ({{pod.name}}) doesn't work as
# expected within DAGTasks
# (https://github.com/argoproj/argo-workflows/issues/7432) and we are forced to
# generate container templates at the top level (in WorkflowSpec) and maintain
# references to them within the DAGTask.
annotations = {}
if self._schedule is not None:
# timezone is an optional field and json dumps on None will result in null
# hence configuring it to an empty string
if self._timezone is None:
self._timezone = ""
cron_info = {"schedule": self._schedule, "tz": self._timezone}
annotations.update({"metaflow/cron": json.dumps(cron_info)})
if self.parameters:
annotations.update({"metaflow/parameters": json.dumps(self.parameters)})
# Some more annotations to populate the Argo UI nicely
if self.tags:
annotations.update({"metaflow/tags": json.dumps(self.tags)})
if self.triggers:
annotations.update(
{
"metaflow/triggers": json.dumps(
[
{key: trigger.get(key) for key in ["name", "type"]}
for trigger in self.triggers
]
)
}
)
if self.notify_on_error:
annotations.update(
{
"metaflow/notify_on_error": json.dumps(
{
"slack": bool(self.notify_slack_webhook_url),
"pager_duty": bool(self.notify_pager_duty_integration_key),
"incident_io": bool(self.notify_incident_io_api_key),
}
)
}
)
if self.notify_on_success:
annotations.update(
{
"metaflow/notify_on_success": json.dumps(
{
"slack": bool(self.notify_slack_webhook_url),
"pager_duty": bool(self.notify_pager_duty_integration_key),
"incident_io": bool(self.notify_incident_io_api_key),
}
)
}
)
try:
# Build the DAG based on the DAGNodes given by the FlowGraph for the found FlowSpec class.
_steps_info, graph_structure = self.graph.output_steps()
graph_info = {
# for the time being, we only need the graph_structure. Being mindful of annotation size limits we do not include anything extra.
"graph_structure": graph_structure
}
except Exception:
graph_info = None
dag_annotation = {"metaflow/dag": json.dumps(graph_info)}
return (
WorkflowTemplate()
.metadata(
# Workflow Template metadata.
ObjectMeta()
.name(self.name)
# Argo currently only supports Workflow-level namespaces. When v3.4.0
# is released, we should be able to support multi-namespace /
# multi-cluster scheduling.
.namespace(KUBERNETES_NAMESPACE)
.annotations(annotations)
.annotations(self._base_annotations)
.labels(self._base_labels)
.label("app.kubernetes.io/name", "metaflow-flow")
.annotations(dag_annotation)
)
.spec(
WorkflowSpec()
# Set overall workflow timeout.
.active_deadline_seconds(self.workflow_timeout)
# TODO: Allow Argo to optionally archive all workflow execution logs
# It's disabled for now since it requires all Argo installations
# to enable an artifactory repository. If log archival is
# enabled in workflow controller, the logs for this workflow will
# automatically get archived.
# .archive_logs()
# Don't automount service tokens for now - https://github.com/kubernetes/kubernetes/issues/16779#issuecomment-159656641
# TODO: Service account names are currently set in the templates. We
# can specify the default service account name here to reduce
# the size of the generated YAML by a tiny bit.
# .automount_service_account_token()
# TODO: Support ImagePullSecrets for Argo & Kubernetes
# Not strictly needed since a very valid workaround exists
# https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account/#add-imagepullsecrets-to-a-service-account
# .image_pull_secrets(...)
# Limit workflow parallelism
.parallelism(self.max_workers)
# TODO: Support Prometheus metrics for Argo
# .metrics(...)
# TODO: Support PodGC and DisruptionBudgets
.priority(self.workflow_priority)
# Set workflow metadata
.workflow_metadata(
Metadata()
.labels(self._base_labels)
.label("app.kubernetes.io/name", "metaflow-run")
.annotations(
{
**annotations,
**self._base_annotations,
**{"metaflow/run_id": "argo-{{workflow.name}}"},
}
)
# TODO: Set dynamic labels using labels_from. Ideally, we would
# want to expose run_id as a label. It's easy to add labels,
# but very difficult to remove them - let's err on the
# conservative side and only add labels when we come across
# use-cases for them.
)
# Handle parameters
.arguments(
Arguments().parameters(
[
Parameter(parameter["name"])
.value(
"'%s'" % parameter["value"]
if parameter["type"] == "JSON"
else parameter["value"]
)
.description(parameter.get("description"))
# TODO: Better handle IncludeFile in Argo Workflows UI.
for parameter in self.parameters.values()
]
+ [
# Introduce non-required parameters for argo events so
# that the entire event payload can be accessed within the
# run. The parameter name is hashed to ensure that
# there won't be any collisions with Metaflow parameters.
Parameter(event["sanitized_name"])
.value(json.dumps(None)) # None in Argo Workflows world.
.description("auto-set by metaflow. safe to ignore.")
for event in self.triggers
]
)
)
# Set common pod metadata.
.pod_metadata(
Metadata()
.labels(self._base_labels)
.label("app.kubernetes.io/name", "metaflow-task")
.annotations(
{
**annotations,
**self._base_annotations,
**{
"metaflow/run_id": "argo-{{workflow.name}}"
}, # we want pods of the workflow to have the run_id as an annotation as well
}
)
)
# Set the entrypoint to flow name
.entrypoint(self.flow.name)
# OnExit hooks
.onExit(
"capture-error-hook-fn-preflight"
if self.enable_error_msg_capture
else None
)
# Set exit hook handlers if notifications are enabled
.hooks(
{
**(
{
# workflow status maps to Completed
"notify-slack-on-success": LifecycleHook()
.expression("workflow.status == 'Succeeded'")
.template("notify-slack-on-success"),
}
if self.notify_on_success and self.notify_slack_webhook_url
else {}
),
**(
{
# workflow status maps to Completed
"notify-pager-duty-on-success": LifecycleHook()
.expression("workflow.status == 'Succeeded'")
.template("notify-pager-duty-on-success"),
}
if self.notify_on_success
and self.notify_pager_duty_integration_key
else {}
),
**(
{
# workflow status maps to Completed
"notify-incident-io-on-success": LifecycleHook()
.expression("workflow.status == 'Succeeded'")
.template("notify-incident-io-on-success"),
}
if self.notify_on_success
and self.notify_incident_io_api_key
else {}
),
**(
{
# workflow status maps to Failed or Error
"notify-slack-on-failure": LifecycleHook()
.expression("workflow.status == 'Failed'")
.template("notify-slack-on-error"),
"notify-slack-on-error": LifecycleHook()
.expression("workflow.status == 'Error'")
.template("notify-slack-on-error"),
}
if self.notify_on_error and self.notify_slack_webhook_url
else {}
),
**(
{
# workflow status maps to Failed or Error
"notify-pager-duty-on-failure": LifecycleHook()
.expression("workflow.status == 'Failed'")
.template("notify-pager-duty-on-error"),
"notify-pager-duty-on-error": LifecycleHook()
.expression("workflow.status == 'Error'")
.template("notify-pager-duty-on-error"),
}
if self.notify_on_error
and self.notify_pager_duty_integration_key
else {}
),
**(
{
# workflow status maps to Failed or Error
"notify-incident-io-on-failure": LifecycleHook()
.expression("workflow.status == 'Failed'")
.template("notify-incident-io-on-error"),
"notify-incident-io-on-error": LifecycleHook()
.expression("workflow.status == 'Error'")
.template("notify-incident-io-on-error"),
}
if self.notify_on_error and self.notify_incident_io_api_key
else {}
),
# Warning: terrible hack to workaround a bug in Argo Workflow
# where the hooks listed above do not execute unless
# there is an explicit exit hook. as and when this
# bug is patched, we should remove this effectively
# no-op hook.
**(
{"exit": LifecycleHook().template("exit-hook-hack")}
if self.notify_on_error or self.notify_on_success
else {}
),
}
)
# Top-level DAG template(s)
.templates(self._dag_templates())
# Container templates
.templates(self._container_templates())
# Exit hook template(s)
.templates(self._exit_hook_templates())
# Sidecar templates (Daemon Containers)
.templates(self._daemon_templates())
)
)