in metaflow/plugins/kubernetes/kubernetes_decorator.py [0:0]
def step_init(self, flow, graph, step, decos, environment, flow_datastore, logger):
# Executing Kubernetes jobs requires a non-local datastore.
if flow_datastore.TYPE not in ("s3", "azure", "gs"):
raise KubernetesException(
"The *@kubernetes* decorator requires --datastore=s3 or --datastore=azure or --datastore=gs at the moment."
)
# Set internal state.
self.logger = logger
self.environment = environment
self.step = step
self.flow_datastore = flow_datastore
if (
self.attributes["qos"] is not None
# case insensitive matching.
and self.attributes["qos"].lower()
not in [c.lower() for c in SUPPORTED_KUBERNETES_QOS_CLASSES]
):
raise MetaflowException(
"*%s* is not a valid Kubernetes QoS class. Choose one of the following: %s"
% (self.attributes["qos"], ", ".join(SUPPORTED_KUBERNETES_QOS_CLASSES))
)
if any([deco.name == "batch" for deco in decos]):
raise MetaflowException(
"Step *{step}* is marked for execution both on AWS Batch and "
"Kubernetes. Please use one or the other.".format(step=step)
)
if any([deco.name == "parallel" for deco in decos]) and any(
[deco.name == "catch" for deco in decos]
):
raise MetaflowException(
"Step *{step}* contains a @parallel decorator "
"with the @catch decorator. @catch is not supported with @parallel on Kubernetes.".format(
step=step
)
)
# Set run time limit for the Kubernetes job.
self.run_time_limit = get_run_time_limit_for_task(decos)
if self.run_time_limit < 60:
raise KubernetesException(
"The timeout for step *{step}* should be at least 60 seconds for "
"execution on Kubernetes.".format(step=step)
)
for deco in decos:
if isinstance(deco, ResourcesDecorator):
for k, v in deco.attributes.items():
# If GPU count is specified, explicitly set it in self.attributes.
if k == "gpu" and v != None:
self.attributes["gpu"] = v
if k in self.attributes:
if self.defaults[k] is None:
# skip if expected value isn't an int/float
continue
# We use the larger of @resources and @batch attributes
# TODO: Fix https://github.com/Netflix/metaflow/issues/467
my_val = self.attributes.get(k)
if not (my_val is None and v is None):
self.attributes[k] = str(
max(float(my_val or 0), float(v or 0))
)
# Check GPU vendor.
if self.attributes["gpu_vendor"].lower() not in ("amd", "nvidia"):
raise KubernetesException(
"GPU vendor *{}* for step *{step}* is not currently supported.".format(
self.attributes["gpu_vendor"], step=step
)
)
# CPU, Disk, and Memory values should be greater than 0.
for attr in ["cpu", "disk", "memory"]:
if not (
isinstance(self.attributes[attr], (int, unicode, basestring, float))
and float(self.attributes[attr]) > 0
):
raise KubernetesException(
"Invalid {} value *{}* for step *{step}*; it should be greater than 0".format(
attr, self.attributes[attr], step=step
)
)
if self.attributes["gpu"] is not None and not (
isinstance(self.attributes["gpu"], (int, unicode, basestring))
and float(self.attributes["gpu"]).is_integer()
):
raise KubernetesException(
"Invalid GPU value *{}* for step *{step}*; it should be an integer".format(
self.attributes["gpu"], step=step
)
)
if self.attributes["tmpfs_size"]:
if not (
isinstance(self.attributes["tmpfs_size"], (int, unicode, basestring))
and int(self.attributes["tmpfs_size"]) > 0
):
raise KubernetesException(
"Invalid tmpfs_size value: *{size}* for step *{step}* (should be an integer greater than 0)".format(
size=self.attributes["tmpfs_size"], step=step
)
)
if self.attributes["shared_memory"]:
if not (
isinstance(self.attributes["shared_memory"], int)
and int(self.attributes["shared_memory"]) > 0
):
raise KubernetesException(
"Invalid shared_memory value: *{size}* for step *{step}* (should be an integer greater than 0)".format(
size=self.attributes["shared_memory"], step=step
)
)
validate_kube_labels(self.attributes["labels"])