def _process_triggers()

in metaflow/plugins/argo/argo_workflows.py [0:0]


    def _process_triggers(self):
        # Impute triggers for Argo Workflow Template specified through @trigger and
        # @trigger_on_finish decorators

        # Disallow usage of @trigger and @trigger_on_finish together for now.
        if self.flow._flow_decorators.get("trigger") and self.flow._flow_decorators.get(
            "trigger_on_finish"
        ):
            raise ArgoWorkflowsException(
                "Argo Workflows doesn't support both *@trigger* and "
                "*@trigger_on_finish* decorators concurrently yet. Use one or the "
                "other for now."
            )
        triggers = []
        options = None

        # @trigger decorator
        if self.flow._flow_decorators.get("trigger"):
            # Parameters are not duplicated, and exist in the flow. Additionally,
            # convert them to lower case since Metaflow parameters are case
            # insensitive.
            seen = set()
            # NOTE: We skip config parameters as their values can not be set through event payloads
            params = set(
                [
                    param.name.lower()
                    for var, param in self.flow._get_parameters()
                    if not param.IS_CONFIG_PARAMETER
                ]
            )
            trigger_deco = self.flow._flow_decorators.get("trigger")[0]
            trigger_deco.format_deploytime_value()
            for event in trigger_deco.triggers:
                parameters = {}
                # TODO: Add a check to guard against names starting with numerals(?)
                if not re.match(r"^[A-Za-z0-9_.-]+$", event["name"]):
                    raise ArgoWorkflowsException(
                        "Invalid event name *%s* in *@trigger* decorator. Only "
                        "alphanumeric characters, underscores(_), dashes(-) and "
                        "dots(.) are allowed." % event["name"]
                    )
                for key, value in event.get("parameters", {}).items():
                    if not re.match(r"^[A-Za-z0-9_]+$", value):
                        raise ArgoWorkflowsException(
                            "Invalid event payload key *%s* for event *%s* in "
                            "*@trigger* decorator. Only alphanumeric characters and "
                            "underscores(_) are allowed." % (value, event["name"])
                        )
                    if key.lower() not in params:
                        raise ArgoWorkflowsException(
                            "Parameter *%s* defined in the event mappings for "
                            "*@trigger* decorator not found in the flow." % key
                        )
                    if key.lower() in seen:
                        raise ArgoWorkflowsException(
                            "Duplicate entries for parameter *%s* defined in the "
                            "event mappings for *@trigger* decorator." % key.lower()
                        )
                    seen.add(key.lower())
                    parameters[key.lower()] = value
                event["parameters"] = parameters
                event["type"] = "event"
            triggers.extend(self.flow._flow_decorators.get("trigger")[0].triggers)

            # Set automatic parameter mapping iff only a single event dependency is
            # specified with no explicit parameter mapping.
            if len(triggers) == 1 and not triggers[0].get("parameters"):
                triggers[0]["parameters"] = dict(zip(params, params))
            options = self.flow._flow_decorators.get("trigger")[0].options

        # @trigger_on_finish decorator
        if self.flow._flow_decorators.get("trigger_on_finish"):
            trigger_on_finish_deco = self.flow._flow_decorators.get(
                "trigger_on_finish"
            )[0]
            trigger_on_finish_deco.format_deploytime_value()
            for event in trigger_on_finish_deco.triggers:
                # Actual filters are deduced here since we don't have access to
                # the current object in the @trigger_on_finish decorator.
                project_name = event.get("project") or current.get("project_name")
                branch_name = event.get("branch") or current.get("branch_name")
                # validate that we have complete project info for an event name
                if project_name or branch_name:
                    if not (project_name and branch_name):
                        # if one of the two is missing, we would end up listening to an event that will never be broadcast.
                        raise ArgoWorkflowsException(
                            "Incomplete project info. Please specify both 'project' and 'project_branch' or use the @project decorator"
                        )

                triggers.append(
                    {
                        # Make sure this remains consistent with the event name format
                        # in ArgoWorkflowsInternalDecorator.
                        "name": "metaflow.%s.end"
                        % ".".join(
                            v
                            for v in [
                                project_name,
                                branch_name,
                                event["flow"],
                            ]
                            if v
                        ),
                        "filters": {
                            "auto-generated-by-metaflow": True,
                            "project_name": project_name,
                            "branch_name": branch_name,
                            # TODO: Add a time filters to guard against cached events
                        },
                        "type": "run",
                        "flow": event["flow"],
                    }
                )
            options = self.flow._flow_decorators.get("trigger_on_finish")[0].options

        for event in triggers:
            # Assign a sanitized name since we need this at many places to please
            # Argo Events sensors. There is a slight possibility of name collision
            # but quite unlikely for us to worry about at this point.
            event["sanitized_name"] = "%s_%s" % (
                event["name"]
                .replace(".", "")
                .replace("-", "")
                .replace("@", "")
                .replace("+", ""),
                to_unicode(base64.b32encode(sha1(to_bytes(event["name"])).digest()))[
                    :4
                ].lower(),
            )
        return triggers, options