def _compile_sensor()

in metaflow/plugins/argo/argo_workflows.py [0:0]


    def _compile_sensor(self):
        # This method compiles a Metaflow @trigger decorator into Argo Events Sensor.
        #
        # Event payload is assumed as -
        # ----------------------------------------------------------------------
        # | name                   | name of the event                         |
        # | payload                |                                           |
        # |     parameter name...  |  parameter value                          |
        # |     parameter name...  |  parameter value                          |
        # |     parameter name...  |  parameter value                          |
        # |     parameter name...  |  parameter value                          |
        # ----------------------------------------------------------------------
        #
        #
        #
        # At the moment, every event-triggered workflow template has a dedicated
        # sensor (which can potentially be a bit wasteful in scenarios with high
        # volume of workflows and low volume of events) - introducing a many-to-one
        # sensor-to-workflow-template solution is completely in the realm of
        # possibilities (modulo consistency and transactional guarantees).
        #
        # This implementation side-steps the more prominent/popular usage of event
        # sensors where the sensor is responsible for submitting the workflow object
        # directly. Instead we construct the equivalent behavior of `argo submit
        # --from` to reference an already submitted workflow template. This ensures
        # that Metaflow generated Kubernetes objects can be easily reasoned about.
        #
        # At the moment, Metaflow configures for webhook and NATS event sources. If you
        # are interested in the HA story for either - please follow this link
        # https://argoproj.github.io/argo-events/eventsources/ha/.
        #
        # There is some potential for confusion between Metaflow concepts and Argo
        # Events concepts, particularly for event names. Argo Events EventSource
        # define an event name which is different than the Metaflow event name - think
        # of Argo Events name as a type of event (conceptually like topics in Kafka)
        # while Metaflow event names are a field within the Argo Event.
        #
        #
        # At the moment, there is parity between the labels and annotations for
        # workflow templates and sensors - that may or may not be the case in the
        # future.
        #
        # Unfortunately, there doesn't seem to be a way to create a sensor filter
        # where one (or more) fields across multiple events have the same value.
        # Imagine a scenario where we want to trigger a flow iff both the dependent
        # events agree on the same date field. Unfortunately, there isn't any way in
        # Argo Events (as of apr'23) to ensure that.

        # Nothing to do here - let's short circuit and exit.
        if not self.triggers:
            return {}

        # Ensure proper configuration is available for Argo Events
        if ARGO_EVENTS_EVENT is None:
            raise ArgoWorkflowsException(
                "An Argo Event name hasn't been configured for your deployment yet. "
                "Please see this article for more details on event names - "
                "https://argoproj.github.io/argo-events/eventsources/naming/. "
                "It is very likely that all events for your deployment share the "
                "same name. You can configure it by executing "
                "`metaflow configure kubernetes` or setting METAFLOW_ARGO_EVENTS_EVENT "
                "in your configuration. If in doubt, reach out for support at "
                "http://chat.metaflow.org"
            )
        # Unfortunately argo events requires knowledge of event source today.
        # Hopefully, some day this requirement can be removed and events can be truly
        # impervious to their source and destination.
        if ARGO_EVENTS_EVENT_SOURCE is None:
            raise ArgoWorkflowsException(
                "An Argo Event Source name hasn't been configured for your deployment "
                "yet. Please see this article for more details on event names - "
                "https://argoproj.github.io/argo-events/eventsources/naming/. "
                "You can configure it by executing `metaflow configure kubernetes` or "
                "setting METAFLOW_ARGO_EVENTS_EVENT_SOURCE in your configuration. If "
                "in doubt, reach out for support at http://chat.metaflow.org"
            )
        # Service accounts are a hard requirement since we utilize the
        # argoWorkflow trigger for resource sensors today.
        if ARGO_EVENTS_SERVICE_ACCOUNT is None:
            raise ArgoWorkflowsException(
                "An Argo Event service account hasn't been configured for your "
                "deployment yet. Please see this article for more details on event "
                "names - https://argoproj.github.io/argo-events/service-accounts/. "
                "You can configure it by executing `metaflow configure kubernetes` or "
                "setting METAFLOW_ARGO_EVENTS_SERVICE_ACCOUNT in your configuration. "
                "If in doubt, reach out for support at http://chat.metaflow.org"
            )

        try:
            # Kubernetes is a soft dependency for generating Argo objects.
            # We can very well remove this dependency for Argo with the downside of
            # adding a bunch more json bloat classes (looking at you... V1Container)
            from kubernetes import client as kubernetes_sdk
        except (NameError, ImportError):
            raise MetaflowException(
                "Could not import Python package 'kubernetes'. Install kubernetes "
                "sdk (https://pypi.org/project/kubernetes/) first."
            )

        return (
            Sensor()
            .metadata(
                # Sensor metadata.
                ObjectMeta()
                .name(ArgoWorkflows._sensor_name(self.name))
                .namespace(KUBERNETES_NAMESPACE)
                .labels(self._base_labels)
                .label("app.kubernetes.io/name", "metaflow-sensor")
                .annotations(self._base_annotations)
            )
            .spec(
                SensorSpec().template(
                    # Sensor template.
                    SensorTemplate()
                    .metadata(
                        ObjectMeta()
                        .label("app.kubernetes.io/name", "metaflow-sensor")
                        .label("app.kubernetes.io/part-of", "metaflow")
                        .annotations(self._base_annotations)
                    )
                    .container(
                        # Run sensor in guaranteed QoS. The sensor isn't doing a lot
                        # of work so we roll with minimal resource allocation. It is
                        # likely that in subsequent releases we will agressively lower
                        # sensor resources to pack more of them on a single node.
                        to_camelcase(
                            kubernetes_sdk.V1Container(
                                name="main",
                                resources=kubernetes_sdk.V1ResourceRequirements(
                                    requests={
                                        "cpu": "100m",
                                        "memory": "250Mi",
                                    },
                                    limits={
                                        "cpu": "100m",
                                        "memory": "250Mi",
                                    },
                                ),
                            ).to_dict()
                        )
                    )
                    .service_account_name(ARGO_EVENTS_SERVICE_ACCOUNT)
                    # TODO (savin): Handle bypassing docker image rate limit errors.
                )
                # Set sensor replica to 1 for now.
                # TODO (savin): Allow for multiple replicas for HA.
                .replicas(1)
                # TODO: Support revision history limit to manage old deployments
                # .revision_history_limit(...)
                .event_bus_name(ARGO_EVENTS_EVENT_BUS)
                # Workflow trigger.
                .trigger(
                    Trigger().template(
                        TriggerTemplate(self.name)
                        # Trigger a deployed workflow template
                        .argo_workflow_trigger(
                            ArgoWorkflowTrigger()
                            .source(
                                {
                                    "resource": {
                                        "apiVersion": "argoproj.io/v1alpha1",
                                        "kind": "Workflow",
                                        "metadata": {
                                            "generateName": "%s-" % self.name,
                                            "namespace": KUBERNETES_NAMESPACE,
                                            # Useful to paint the UI
                                            "annotations": {
                                                "metaflow/triggered_by": json.dumps(
                                                    [
                                                        {
                                                            key: trigger.get(key)
                                                            for key in ["name", "type"]
                                                        }
                                                        for trigger in self.triggers
                                                    ]
                                                )
                                            },
                                        },
                                        "spec": {
                                            "arguments": {
                                                "parameters": [
                                                    Parameter(parameter["name"])
                                                    .value(parameter["value"])
                                                    .to_json()
                                                    for parameter in self.parameters.values()
                                                ]
                                                # Also consume event data
                                                + [
                                                    Parameter(event["sanitized_name"])
                                                    .value(json.dumps(None))
                                                    .to_json()
                                                    for event in self.triggers
                                                ]
                                            },
                                            "workflowTemplateRef": {
                                                "name": self.name,
                                            },
                                        },
                                    }
                                }
                            )
                            .parameters(
                                [
                                    y
                                    for x in list(
                                        list(
                                            TriggerParameter()
                                            .src(
                                                dependency_name=event["sanitized_name"],
                                                # Technically, we don't need to create
                                                # a payload carry-on and can stuff
                                                # everything within the body.
                                                # NOTE: We need the conditional logic in order to successfully fall back to the default value
                                                # when the event payload does not contain a key for a parameter.
                                                # NOTE: Keys might contain dashes, so use the safer 'get' for fetching the value
                                                data_template='{{ if (hasKey $.Input.body.payload "%s") }}{{- (get $.Input.body.payload "%s" %s) -}}{{- else -}}{{ (fail "use-default-instead") }}{{- end -}}'
                                                % (
                                                    v,
                                                    v,
                                                    (
                                                        "| toRawJson | squote"
                                                        if self.parameters[
                                                            parameter_name
                                                        ]["type"]
                                                        == "JSON"
                                                        else "| toRawJson"
                                                    ),
                                                ),
                                                # Unfortunately the sensor needs to
                                                # record the default values for
                                                # the parameters - there doesn't seem
                                                # to be any way for us to skip
                                                value=(
                                                    json.dumps(
                                                        self.parameters[parameter_name][
                                                            "value"
                                                        ]
                                                    )
                                                    if self.parameters[parameter_name][
                                                        "type"
                                                    ]
                                                    == "JSON"
                                                    else self.parameters[
                                                        parameter_name
                                                    ]["value"]
                                                ),
                                            )
                                            .dest(
                                                # this undocumented (mis?)feature in
                                                # argo-events allows us to reference
                                                # parameters by name rather than index
                                                "spec.arguments.parameters.#(name=%s).value"
                                                % parameter_name
                                            )
                                            for parameter_name, v in event.get(
                                                "parameters", {}
                                            ).items()
                                        )
                                        for event in self.triggers
                                    )
                                    for y in x
                                ]
                                + [
                                    # Map event payload to parameters for current
                                    TriggerParameter()
                                    .src(
                                        dependency_name=event["sanitized_name"],
                                        data_key="body.payload",
                                        value=json.dumps(None),
                                    )
                                    .dest(
                                        "spec.arguments.parameters.#(name=%s).value"
                                        % event["sanitized_name"]
                                    )
                                    for event in self.triggers
                                ]
                            )
                            # Reset trigger conditions ever so often by wiping
                            # away event tracking history on a schedule.
                            # @trigger(options={"reset_at": {"cron": , "timezone": }})
                            # timezone is IANA standard, e.g. America/Los_Angeles
                            # TODO: Introduce "end_of_day", "end_of_hour" ..
                        ).conditions_reset(
                            cron=self.trigger_options.get("reset_at", {}).get("cron"),
                            timezone=self.trigger_options.get("reset_at", {}).get(
                                "timezone"
                            ),
                        )
                    )
                )
                # Event dependencies. As of Mar' 23, Argo Events docs suggest using
                # Jetstream event bus rather than NATS streaming bus since the later
                # doesn't support multiple combos of the same event name and event
                # source name.
                .dependencies(
                    # Event dependencies don't entertain dots
                    EventDependency(event["sanitized_name"]).event_name(
                        ARGO_EVENTS_EVENT
                    )
                    # TODO: Alternatively fetch this from @trigger config options
                    .event_source_name(ARGO_EVENTS_EVENT_SOURCE).filters(
                        # Ensure that event name matches and all required parameter
                        # fields are present in the payload. There is a possibility of
                        # dependency on an event where none of the fields are required.
                        # At the moment, this event is required but the restriction
                        # can be removed if needed.
                        EventDependencyFilter().exprs(
                            [
                                {
                                    "expr": "name == '%s'" % event["name"],
                                    "fields": [
                                        {"name": "name", "path": "body.payload.name"}
                                    ],
                                }
                            ]
                            + [
                                {
                                    "expr": "true == true",  # field name is present
                                    "fields": [
                                        {
                                            "name": "field",
                                            "path": "body.payload.%s" % v,
                                        }
                                    ],
                                }
                                for parameter_name, v in event.get(
                                    "parameters", {}
                                ).items()
                                # only for required parameters
                                if self.parameters[parameter_name]["is_required"]
                            ]
                            + [
                                {
                                    "expr": "field == '%s'" % v,  # trigger_on_finish
                                    "fields": [
                                        {
                                            "name": "field",
                                            "path": "body.payload.%s" % filter_key,
                                        }
                                    ],
                                }
                                for filter_key, v in event.get("filters", {}).items()
                                if v
                            ]
                        )
                    )
                    for event in self.triggers
                )
            )
        )