in metaflow/plugins/kubernetes/kubernetes_decorator.py [0:0]
def init(self):
super(KubernetesDecorator, self).init()
if not self.attributes["namespace"]:
self.attributes["namespace"] = KUBERNETES_NAMESPACE
if not self.attributes["service_account"]:
self.attributes["service_account"] = KUBERNETES_SERVICE_ACCOUNT
if not self.attributes["gpu_vendor"]:
self.attributes["gpu_vendor"] = KUBERNETES_GPU_VENDOR
if not self.attributes["node_selector"] and KUBERNETES_NODE_SELECTOR:
self.attributes["node_selector"] = KUBERNETES_NODE_SELECTOR
if not self.attributes["tolerations"] and KUBERNETES_TOLERATIONS:
self.attributes["tolerations"] = json.loads(KUBERNETES_TOLERATIONS)
if (
not self.attributes["persistent_volume_claims"]
and KUBERNETES_PERSISTENT_VOLUME_CLAIMS
):
self.attributes["persistent_volume_claims"] = json.loads(
KUBERNETES_PERSISTENT_VOLUME_CLAIMS
)
if not self.attributes["image_pull_policy"] and KUBERNETES_IMAGE_PULL_POLICY:
self.attributes["image_pull_policy"] = KUBERNETES_IMAGE_PULL_POLICY
if isinstance(self.attributes["node_selector"], str):
self.attributes["node_selector"] = parse_kube_keyvalue_list(
self.attributes["node_selector"].split(",")
)
if self.attributes["compute_pool"]:
if self.attributes["node_selector"] is None:
self.attributes["node_selector"] = {}
self.attributes["node_selector"].update(
{"outerbounds.co/compute-pool": self.attributes["compute_pool"]}
)
if self.attributes["tolerations"]:
try:
from kubernetes.client import V1Toleration
for toleration in self.attributes["tolerations"]:
try:
invalid_keys = [
k
for k in toleration.keys()
if k not in V1Toleration.attribute_map.keys()
]
if len(invalid_keys) > 0:
raise KubernetesException(
"Tolerations parameter contains invalid keys: %s"
% invalid_keys
)
except AttributeError:
raise KubernetesException(
"Unable to parse tolerations: %s"
% self.attributes["tolerations"]
)
except (NameError, ImportError):
pass
# parse the CPU, memory, disk, values from the KUBERNETES_ environment variable (you would need to export the METAFLOW_KUBERNETES_CPU, METAFLOW_KUBERNETES_MEMORY and/or METAFLOW_KUBERNTES_DISK environment variable with the desired values before running the flow)
# find the values from the environment variables, then validate if the values are still the default ones, if so, then replace them with the values from the environment variables (otherwise, keep the values from the decorator)
if self.attributes["cpu"] == self.defaults["cpu"] and KUBERNETES_CPU:
self.attributes["cpu"] = KUBERNETES_CPU
if self.attributes["memory"] == self.defaults["memory"] and KUBERNETES_MEMORY:
self.attributes["memory"] = KUBERNETES_MEMORY
if self.attributes["disk"] == self.defaults["disk"] and KUBERNETES_DISK:
self.attributes["disk"] = KUBERNETES_DISK
# Label source precedence (decreasing):
# - System labels (set outside of decorator)
# - Decorator labels: @kubernetes(labels={})
# - Environment variable labels: METAFLOW_KUBERNETES_LABELS=
deco_labels = {}
if self.attributes["labels"] is not None:
deco_labels = self.attributes["labels"]
env_labels = {}
if KUBERNETES_LABELS:
env_labels = parse_kube_keyvalue_list(KUBERNETES_LABELS.split(","), False)
self.attributes["labels"] = {**env_labels, **deco_labels}
# Annotations
# annotation precedence (decreasing):
# - System annotations (set outside of decorator)
# - Decorator annotations: @kubernetes(annotations={})
# - Environment annotations: METAFLOW_KUBERNETES_ANNOTATIONS=
deco_annotations = {}
if self.attributes["annotations"] is not None:
deco_annotations = self.attributes["annotations"]
env_annotations = {}
if KUBERNETES_ANNOTATIONS:
env_annotations = parse_kube_keyvalue_list(
KUBERNETES_ANNOTATIONS.split(","), False
)
self.attributes["annotations"] = {**env_annotations, **deco_annotations}
# If no docker image is explicitly specified, impute a default image.
if not self.attributes["image"]:
# If metaflow-config specifies a docker image, just use that.
if KUBERNETES_CONTAINER_IMAGE:
self.attributes["image"] = KUBERNETES_CONTAINER_IMAGE
# If metaflow-config doesn't specify a docker image, assign a
# default docker image.
else:
# Default to vanilla Python image corresponding to major.minor
# version of the Python interpreter launching the flow.
self.attributes["image"] = "python:%s.%s" % (
platform.python_version_tuple()[0],
platform.python_version_tuple()[1],
)
# Assign docker registry URL for the image.
if not get_docker_registry(self.attributes["image"]):
if KUBERNETES_CONTAINER_REGISTRY:
self.attributes["image"] = "%s/%s" % (
KUBERNETES_CONTAINER_REGISTRY.rstrip("/"),
self.attributes["image"],
)
# Check if TmpFS is enabled and set default tmpfs_size if missing.
if self.attributes["use_tmpfs"] or (
self.attributes["tmpfs_size"] and not self.attributes["use_tmpfs"]
):
if not self.attributes["tmpfs_size"]:
# default tmpfs behavior - https://man7.org/linux/man-pages/man5/tmpfs.5.html
self.attributes["tmpfs_size"] = int(self.attributes["memory"]) // 2
if not self.attributes["shared_memory"]:
self.attributes["shared_memory"] = KUBERNETES_SHARED_MEMORY
if not self.attributes["port"]:
self.attributes["port"] = KUBERNETES_PORT