def step_init()

in metaflow/plugins/kubernetes/kubernetes_decorator.py [0:0]


    def step_init(self, flow, graph, step, decos, environment, flow_datastore, logger):
        # Executing Kubernetes jobs requires a non-local datastore.
        if flow_datastore.TYPE not in ("s3", "azure", "gs"):
            raise KubernetesException(
                "The *@kubernetes* decorator requires --datastore=s3 or --datastore=azure or --datastore=gs at the moment."
            )

        # Set internal state.
        self.logger = logger
        self.environment = environment
        self.step = step
        self.flow_datastore = flow_datastore

        if (
            self.attributes["qos"] is not None
            # case insensitive matching.
            and self.attributes["qos"].lower()
            not in [c.lower() for c in SUPPORTED_KUBERNETES_QOS_CLASSES]
        ):
            raise MetaflowException(
                "*%s* is not a valid Kubernetes QoS class. Choose one of the following: %s"
                % (self.attributes["qos"], ", ".join(SUPPORTED_KUBERNETES_QOS_CLASSES))
            )

        if any([deco.name == "batch" for deco in decos]):
            raise MetaflowException(
                "Step *{step}* is marked for execution both on AWS Batch and "
                "Kubernetes. Please use one or the other.".format(step=step)
            )

        if any([deco.name == "parallel" for deco in decos]) and any(
            [deco.name == "catch" for deco in decos]
        ):
            raise MetaflowException(
                "Step *{step}* contains a @parallel decorator "
                "with the @catch decorator. @catch is not supported with @parallel on Kubernetes.".format(
                    step=step
                )
            )

        # Set run time limit for the Kubernetes job.
        self.run_time_limit = get_run_time_limit_for_task(decos)
        if self.run_time_limit < 60:
            raise KubernetesException(
                "The timeout for step *{step}* should be at least 60 seconds for "
                "execution on Kubernetes.".format(step=step)
            )

        for deco in decos:
            if isinstance(deco, ResourcesDecorator):
                for k, v in deco.attributes.items():
                    # If GPU count is specified, explicitly set it in self.attributes.
                    if k == "gpu" and v != None:
                        self.attributes["gpu"] = v

                    if k in self.attributes:
                        if self.defaults[k] is None:
                            # skip if expected value isn't an int/float
                            continue
                        # We use the larger of @resources and @batch attributes
                        # TODO: Fix https://github.com/Netflix/metaflow/issues/467
                        my_val = self.attributes.get(k)
                        if not (my_val is None and v is None):
                            self.attributes[k] = str(
                                max(float(my_val or 0), float(v or 0))
                            )

        # Check GPU vendor.
        if self.attributes["gpu_vendor"].lower() not in ("amd", "nvidia"):
            raise KubernetesException(
                "GPU vendor *{}* for step *{step}* is not currently supported.".format(
                    self.attributes["gpu_vendor"], step=step
                )
            )

        # CPU, Disk, and Memory values should be greater than 0.
        for attr in ["cpu", "disk", "memory"]:
            if not (
                isinstance(self.attributes[attr], (int, unicode, basestring, float))
                and float(self.attributes[attr]) > 0
            ):
                raise KubernetesException(
                    "Invalid {} value *{}* for step *{step}*; it should be greater than 0".format(
                        attr, self.attributes[attr], step=step
                    )
                )

        if self.attributes["gpu"] is not None and not (
            isinstance(self.attributes["gpu"], (int, unicode, basestring))
            and float(self.attributes["gpu"]).is_integer()
        ):
            raise KubernetesException(
                "Invalid GPU value *{}* for step *{step}*; it should be an integer".format(
                    self.attributes["gpu"], step=step
                )
            )

        if self.attributes["tmpfs_size"]:
            if not (
                isinstance(self.attributes["tmpfs_size"], (int, unicode, basestring))
                and int(self.attributes["tmpfs_size"]) > 0
            ):
                raise KubernetesException(
                    "Invalid tmpfs_size value: *{size}* for step *{step}* (should be an integer greater than 0)".format(
                        size=self.attributes["tmpfs_size"], step=step
                    )
                )

        if self.attributes["shared_memory"]:
            if not (
                isinstance(self.attributes["shared_memory"], int)
                and int(self.attributes["shared_memory"]) > 0
            ):
                raise KubernetesException(
                    "Invalid shared_memory value: *{size}* for step *{step}* (should be an integer greater than 0)".format(
                        size=self.attributes["shared_memory"], step=step
                    )
                )

        validate_kube_labels(self.attributes["labels"])