in metaflow/plugins/argo/argo_workflows.py [0:0]
def _process_triggers(self):
# Impute triggers for Argo Workflow Template specified through @trigger and
# @trigger_on_finish decorators
# Disallow usage of @trigger and @trigger_on_finish together for now.
if self.flow._flow_decorators.get("trigger") and self.flow._flow_decorators.get(
"trigger_on_finish"
):
raise ArgoWorkflowsException(
"Argo Workflows doesn't support both *@trigger* and "
"*@trigger_on_finish* decorators concurrently yet. Use one or the "
"other for now."
)
triggers = []
options = None
# @trigger decorator
if self.flow._flow_decorators.get("trigger"):
# Parameters are not duplicated, and exist in the flow. Additionally,
# convert them to lower case since Metaflow parameters are case
# insensitive.
seen = set()
# NOTE: We skip config parameters as their values can not be set through event payloads
params = set(
[
param.name.lower()
for var, param in self.flow._get_parameters()
if not param.IS_CONFIG_PARAMETER
]
)
trigger_deco = self.flow._flow_decorators.get("trigger")[0]
trigger_deco.format_deploytime_value()
for event in trigger_deco.triggers:
parameters = {}
# TODO: Add a check to guard against names starting with numerals(?)
if not re.match(r"^[A-Za-z0-9_.-]+$", event["name"]):
raise ArgoWorkflowsException(
"Invalid event name *%s* in *@trigger* decorator. Only "
"alphanumeric characters, underscores(_), dashes(-) and "
"dots(.) are allowed." % event["name"]
)
for key, value in event.get("parameters", {}).items():
if not re.match(r"^[A-Za-z0-9_]+$", value):
raise ArgoWorkflowsException(
"Invalid event payload key *%s* for event *%s* in "
"*@trigger* decorator. Only alphanumeric characters and "
"underscores(_) are allowed." % (value, event["name"])
)
if key.lower() not in params:
raise ArgoWorkflowsException(
"Parameter *%s* defined in the event mappings for "
"*@trigger* decorator not found in the flow." % key
)
if key.lower() in seen:
raise ArgoWorkflowsException(
"Duplicate entries for parameter *%s* defined in the "
"event mappings for *@trigger* decorator." % key.lower()
)
seen.add(key.lower())
parameters[key.lower()] = value
event["parameters"] = parameters
event["type"] = "event"
triggers.extend(self.flow._flow_decorators.get("trigger")[0].triggers)
# Set automatic parameter mapping iff only a single event dependency is
# specified with no explicit parameter mapping.
if len(triggers) == 1 and not triggers[0].get("parameters"):
triggers[0]["parameters"] = dict(zip(params, params))
options = self.flow._flow_decorators.get("trigger")[0].options
# @trigger_on_finish decorator
if self.flow._flow_decorators.get("trigger_on_finish"):
trigger_on_finish_deco = self.flow._flow_decorators.get(
"trigger_on_finish"
)[0]
trigger_on_finish_deco.format_deploytime_value()
for event in trigger_on_finish_deco.triggers:
# Actual filters are deduced here since we don't have access to
# the current object in the @trigger_on_finish decorator.
project_name = event.get("project") or current.get("project_name")
branch_name = event.get("branch") or current.get("branch_name")
# validate that we have complete project info for an event name
if project_name or branch_name:
if not (project_name and branch_name):
# if one of the two is missing, we would end up listening to an event that will never be broadcast.
raise ArgoWorkflowsException(
"Incomplete project info. Please specify both 'project' and 'project_branch' or use the @project decorator"
)
triggers.append(
{
# Make sure this remains consistent with the event name format
# in ArgoWorkflowsInternalDecorator.
"name": "metaflow.%s.end"
% ".".join(
v
for v in [
project_name,
branch_name,
event["flow"],
]
if v
),
"filters": {
"auto-generated-by-metaflow": True,
"project_name": project_name,
"branch_name": branch_name,
# TODO: Add a time filters to guard against cached events
},
"type": "run",
"flow": event["flow"],
}
)
options = self.flow._flow_decorators.get("trigger_on_finish")[0].options
for event in triggers:
# Assign a sanitized name since we need this at many places to please
# Argo Events sensors. There is a slight possibility of name collision
# but quite unlikely for us to worry about at this point.
event["sanitized_name"] = "%s_%s" % (
event["name"]
.replace(".", "")
.replace("-", "")
.replace("@", "")
.replace("+", ""),
to_unicode(base64.b32encode(sha1(to_bytes(event["name"])).digest()))[
:4
].lower(),
)
return triggers, options