in metaflow/plugins/argo/argo_workflows.py [0:0]
def _compile_sensor(self):
# This method compiles a Metaflow @trigger decorator into Argo Events Sensor.
#
# Event payload is assumed as -
# ----------------------------------------------------------------------
# | name | name of the event |
# | payload | |
# | parameter name... | parameter value |
# | parameter name... | parameter value |
# | parameter name... | parameter value |
# | parameter name... | parameter value |
# ----------------------------------------------------------------------
#
#
#
# At the moment, every event-triggered workflow template has a dedicated
# sensor (which can potentially be a bit wasteful in scenarios with high
# volume of workflows and low volume of events) - introducing a many-to-one
# sensor-to-workflow-template solution is completely in the realm of
# possibilities (modulo consistency and transactional guarantees).
#
# This implementation side-steps the more prominent/popular usage of event
# sensors where the sensor is responsible for submitting the workflow object
# directly. Instead we construct the equivalent behavior of `argo submit
# --from` to reference an already submitted workflow template. This ensures
# that Metaflow generated Kubernetes objects can be easily reasoned about.
#
# At the moment, Metaflow configures for webhook and NATS event sources. If you
# are interested in the HA story for either - please follow this link
# https://argoproj.github.io/argo-events/eventsources/ha/.
#
# There is some potential for confusion between Metaflow concepts and Argo
# Events concepts, particularly for event names. Argo Events EventSource
# define an event name which is different than the Metaflow event name - think
# of Argo Events name as a type of event (conceptually like topics in Kafka)
# while Metaflow event names are a field within the Argo Event.
#
#
# At the moment, there is parity between the labels and annotations for
# workflow templates and sensors - that may or may not be the case in the
# future.
#
# Unfortunately, there doesn't seem to be a way to create a sensor filter
# where one (or more) fields across multiple events have the same value.
# Imagine a scenario where we want to trigger a flow iff both the dependent
# events agree on the same date field. Unfortunately, there isn't any way in
# Argo Events (as of apr'23) to ensure that.
# Nothing to do here - let's short circuit and exit.
if not self.triggers:
return {}
# Ensure proper configuration is available for Argo Events
if ARGO_EVENTS_EVENT is None:
raise ArgoWorkflowsException(
"An Argo Event name hasn't been configured for your deployment yet. "
"Please see this article for more details on event names - "
"https://argoproj.github.io/argo-events/eventsources/naming/. "
"It is very likely that all events for your deployment share the "
"same name. You can configure it by executing "
"`metaflow configure kubernetes` or setting METAFLOW_ARGO_EVENTS_EVENT "
"in your configuration. If in doubt, reach out for support at "
"http://chat.metaflow.org"
)
# Unfortunately argo events requires knowledge of event source today.
# Hopefully, some day this requirement can be removed and events can be truly
# impervious to their source and destination.
if ARGO_EVENTS_EVENT_SOURCE is None:
raise ArgoWorkflowsException(
"An Argo Event Source name hasn't been configured for your deployment "
"yet. Please see this article for more details on event names - "
"https://argoproj.github.io/argo-events/eventsources/naming/. "
"You can configure it by executing `metaflow configure kubernetes` or "
"setting METAFLOW_ARGO_EVENTS_EVENT_SOURCE in your configuration. If "
"in doubt, reach out for support at http://chat.metaflow.org"
)
# Service accounts are a hard requirement since we utilize the
# argoWorkflow trigger for resource sensors today.
if ARGO_EVENTS_SERVICE_ACCOUNT is None:
raise ArgoWorkflowsException(
"An Argo Event service account hasn't been configured for your "
"deployment yet. Please see this article for more details on event "
"names - https://argoproj.github.io/argo-events/service-accounts/. "
"You can configure it by executing `metaflow configure kubernetes` or "
"setting METAFLOW_ARGO_EVENTS_SERVICE_ACCOUNT in your configuration. "
"If in doubt, reach out for support at http://chat.metaflow.org"
)
try:
# Kubernetes is a soft dependency for generating Argo objects.
# We can very well remove this dependency for Argo with the downside of
# adding a bunch more json bloat classes (looking at you... V1Container)
from kubernetes import client as kubernetes_sdk
except (NameError, ImportError):
raise MetaflowException(
"Could not import Python package 'kubernetes'. Install kubernetes "
"sdk (https://pypi.org/project/kubernetes/) first."
)
return (
Sensor()
.metadata(
# Sensor metadata.
ObjectMeta()
.name(ArgoWorkflows._sensor_name(self.name))
.namespace(KUBERNETES_NAMESPACE)
.labels(self._base_labels)
.label("app.kubernetes.io/name", "metaflow-sensor")
.annotations(self._base_annotations)
)
.spec(
SensorSpec().template(
# Sensor template.
SensorTemplate()
.metadata(
ObjectMeta()
.label("app.kubernetes.io/name", "metaflow-sensor")
.label("app.kubernetes.io/part-of", "metaflow")
.annotations(self._base_annotations)
)
.container(
# Run sensor in guaranteed QoS. The sensor isn't doing a lot
# of work so we roll with minimal resource allocation. It is
# likely that in subsequent releases we will agressively lower
# sensor resources to pack more of them on a single node.
to_camelcase(
kubernetes_sdk.V1Container(
name="main",
resources=kubernetes_sdk.V1ResourceRequirements(
requests={
"cpu": "100m",
"memory": "250Mi",
},
limits={
"cpu": "100m",
"memory": "250Mi",
},
),
).to_dict()
)
)
.service_account_name(ARGO_EVENTS_SERVICE_ACCOUNT)
# TODO (savin): Handle bypassing docker image rate limit errors.
)
# Set sensor replica to 1 for now.
# TODO (savin): Allow for multiple replicas for HA.
.replicas(1)
# TODO: Support revision history limit to manage old deployments
# .revision_history_limit(...)
.event_bus_name(ARGO_EVENTS_EVENT_BUS)
# Workflow trigger.
.trigger(
Trigger().template(
TriggerTemplate(self.name)
# Trigger a deployed workflow template
.argo_workflow_trigger(
ArgoWorkflowTrigger()
.source(
{
"resource": {
"apiVersion": "argoproj.io/v1alpha1",
"kind": "Workflow",
"metadata": {
"generateName": "%s-" % self.name,
"namespace": KUBERNETES_NAMESPACE,
# Useful to paint the UI
"annotations": {
"metaflow/triggered_by": json.dumps(
[
{
key: trigger.get(key)
for key in ["name", "type"]
}
for trigger in self.triggers
]
)
},
},
"spec": {
"arguments": {
"parameters": [
Parameter(parameter["name"])
.value(parameter["value"])
.to_json()
for parameter in self.parameters.values()
]
# Also consume event data
+ [
Parameter(event["sanitized_name"])
.value(json.dumps(None))
.to_json()
for event in self.triggers
]
},
"workflowTemplateRef": {
"name": self.name,
},
},
}
}
)
.parameters(
[
y
for x in list(
list(
TriggerParameter()
.src(
dependency_name=event["sanitized_name"],
# Technically, we don't need to create
# a payload carry-on and can stuff
# everything within the body.
# NOTE: We need the conditional logic in order to successfully fall back to the default value
# when the event payload does not contain a key for a parameter.
# NOTE: Keys might contain dashes, so use the safer 'get' for fetching the value
data_template='{{ if (hasKey $.Input.body.payload "%s") }}{{- (get $.Input.body.payload "%s" %s) -}}{{- else -}}{{ (fail "use-default-instead") }}{{- end -}}'
% (
v,
v,
(
"| toRawJson | squote"
if self.parameters[
parameter_name
]["type"]
== "JSON"
else "| toRawJson"
),
),
# Unfortunately the sensor needs to
# record the default values for
# the parameters - there doesn't seem
# to be any way for us to skip
value=(
json.dumps(
self.parameters[parameter_name][
"value"
]
)
if self.parameters[parameter_name][
"type"
]
== "JSON"
else self.parameters[
parameter_name
]["value"]
),
)
.dest(
# this undocumented (mis?)feature in
# argo-events allows us to reference
# parameters by name rather than index
"spec.arguments.parameters.#(name=%s).value"
% parameter_name
)
for parameter_name, v in event.get(
"parameters", {}
).items()
)
for event in self.triggers
)
for y in x
]
+ [
# Map event payload to parameters for current
TriggerParameter()
.src(
dependency_name=event["sanitized_name"],
data_key="body.payload",
value=json.dumps(None),
)
.dest(
"spec.arguments.parameters.#(name=%s).value"
% event["sanitized_name"]
)
for event in self.triggers
]
)
# Reset trigger conditions ever so often by wiping
# away event tracking history on a schedule.
# @trigger(options={"reset_at": {"cron": , "timezone": }})
# timezone is IANA standard, e.g. America/Los_Angeles
# TODO: Introduce "end_of_day", "end_of_hour" ..
).conditions_reset(
cron=self.trigger_options.get("reset_at", {}).get("cron"),
timezone=self.trigger_options.get("reset_at", {}).get(
"timezone"
),
)
)
)
# Event dependencies. As of Mar' 23, Argo Events docs suggest using
# Jetstream event bus rather than NATS streaming bus since the later
# doesn't support multiple combos of the same event name and event
# source name.
.dependencies(
# Event dependencies don't entertain dots
EventDependency(event["sanitized_name"]).event_name(
ARGO_EVENTS_EVENT
)
# TODO: Alternatively fetch this from @trigger config options
.event_source_name(ARGO_EVENTS_EVENT_SOURCE).filters(
# Ensure that event name matches and all required parameter
# fields are present in the payload. There is a possibility of
# dependency on an event where none of the fields are required.
# At the moment, this event is required but the restriction
# can be removed if needed.
EventDependencyFilter().exprs(
[
{
"expr": "name == '%s'" % event["name"],
"fields": [
{"name": "name", "path": "body.payload.name"}
],
}
]
+ [
{
"expr": "true == true", # field name is present
"fields": [
{
"name": "field",
"path": "body.payload.%s" % v,
}
],
}
for parameter_name, v in event.get(
"parameters", {}
).items()
# only for required parameters
if self.parameters[parameter_name]["is_required"]
]
+ [
{
"expr": "field == '%s'" % v, # trigger_on_finish
"fields": [
{
"name": "field",
"path": "body.payload.%s" % filter_key,
}
],
}
for filter_key, v in event.get("filters", {}).items()
if v
]
)
)
for event in self.triggers
)
)
)