in metaflow/plugins/kubernetes/kubernetes_jobsets.py [0:0]
def dump(self):
client = self._kubernetes_sdk
use_tmpfs = self._kwargs["use_tmpfs"]
tmpfs_size = self._kwargs["tmpfs_size"]
tmpfs_enabled = use_tmpfs or (tmpfs_size and not use_tmpfs)
shared_memory = (
int(self._kwargs["shared_memory"])
if self._kwargs["shared_memory"]
else None
)
qos_requests, qos_limits = qos_requests_and_limits(
self._kwargs["qos"],
self._kwargs["cpu"],
self._kwargs["memory"],
self._kwargs["disk"],
)
security_context = self._kwargs.get("security_context", {})
_security_context = {}
if security_context is not None and len(security_context) > 0:
_security_context = {
"security_context": client.V1SecurityContext(**security_context)
}
return dict(
name=self.name,
template=client.api_client.ApiClient().sanitize_for_serialization(
client.V1JobTemplateSpec(
metadata=client.V1ObjectMeta(
namespace=self._kwargs["namespace"],
# We don't set any annotations here
# since they have been either set in the JobSpec
# or on the JobSet level
),
spec=client.V1JobSpec(
# Retries are handled by Metaflow when it is responsible for
# executing the flow. The responsibility is moved to Kubernetes
# when Argo Workflows is responsible for the execution.
backoff_limit=self._kwargs.get("retries", 0),
completions=1,
parallelism=1,
ttl_seconds_after_finished=7
* 60
* 60 # Remove job after a week. TODO: Make this configurable
* 24,
template=client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(
annotations=self._kwargs.get("annotations", {}),
labels=self._kwargs.get("labels", {}),
namespace=self._kwargs["namespace"],
),
spec=client.V1PodSpec(
subdomain=self._kwargs["subdomain"],
set_hostname_as_fqdn=True,
# Timeout is set on the pod and not the job (important!)
active_deadline_seconds=self._kwargs[
"timeout_in_seconds"
],
# TODO (savin): Enable affinities for GPU scheduling.
# affinity=?,
containers=[
client.V1Container(
command=self._kwargs["command"],
termination_message_policy="FallbackToLogsOnError",
ports=(
[]
if self._kwargs["port"] is None
else [
client.V1ContainerPort(
container_port=int(
self._kwargs["port"]
)
)
]
),
env=[
client.V1EnvVar(name=k, value=str(v))
for k, v in self._kwargs.get(
"environment_variables", {}
).items()
]
# And some downward API magic. Add (key, value)
# pairs below to make pod metadata available
# within Kubernetes container.
+ [
client.V1EnvVar(
name=k,
value_from=client.V1EnvVarSource(
field_ref=client.V1ObjectFieldSelector(
field_path=str(v)
)
),
)
for k, v in self._kwargs.get(
"environment_variables_from_selectors",
{},
).items()
]
+ [
client.V1EnvVar(name=k, value=str(v))
for k, v in inject_tracing_vars({}).items()
],
env_from=[
client.V1EnvFromSource(
secret_ref=client.V1SecretEnvSource(
name=str(k),
# optional=True
)
)
for k in list(
self._kwargs.get("secrets", [])
)
if k
],
image=self._kwargs["image"],
image_pull_policy=self._kwargs[
"image_pull_policy"
],
name=self._kwargs["step_name"].replace(
"_", "-"
),
resources=client.V1ResourceRequirements(
requests=qos_requests,
limits={
**qos_limits,
**{
"%s.com/gpu".lower()
% self._kwargs["gpu_vendor"]: str(
self._kwargs["gpu"]
)
for k in [0]
# Don't set GPU limits if gpu isn't specified.
if self._kwargs["gpu"] is not None
},
},
),
volume_mounts=(
[
client.V1VolumeMount(
mount_path=self._kwargs.get(
"tmpfs_path"
),
name="tmpfs-ephemeral-volume",
)
]
if tmpfs_enabled
else []
)
+ (
[
client.V1VolumeMount(
mount_path="/dev/shm", name="dhsm"
)
]
if shared_memory
else []
)
+ (
[
client.V1VolumeMount(
mount_path=path, name=claim
)
for claim, path in self._kwargs[
"persistent_volume_claims"
].items()
]
if self._kwargs["persistent_volume_claims"]
is not None
else []
),
**_security_context,
)
],
node_selector=self._kwargs.get("node_selector"),
# TODO (savin): Support image_pull_secrets
# image_pull_secrets=?,
# TODO (savin): Support preemption policies
# preemption_policy=?,
#
# A Container in a Pod may fail for a number of
# reasons, such as because the process in it exited
# with a non-zero exit code, or the Container was
# killed due to OOM etc. If this happens, fail the pod
# and let Metaflow handle the retries.
restart_policy="Never",
service_account_name=self._kwargs["service_account"],
# Terminate the container immediately on SIGTERM
termination_grace_period_seconds=0,
tolerations=[
client.V1Toleration(**toleration)
for toleration in self._kwargs.get("tolerations")
or []
],
volumes=(
[
client.V1Volume(
name="tmpfs-ephemeral-volume",
empty_dir=client.V1EmptyDirVolumeSource(
medium="Memory",
# Add default unit as ours differs from Kubernetes default.
size_limit="{}Mi".format(tmpfs_size),
),
)
]
if tmpfs_enabled
else []
)
+ (
[
client.V1Volume(
name="dhsm",
empty_dir=client.V1EmptyDirVolumeSource(
medium="Memory",
size_limit="{}Mi".format(shared_memory),
),
)
]
if shared_memory
else []
)
+ (
[
client.V1Volume(
name=claim,
persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(
claim_name=claim
),
)
for claim in self._kwargs[
"persistent_volume_claims"
].keys()
]
if self._kwargs["persistent_volume_claims"]
is not None
else []
),
),
),
),
)
),
replicas=self._kwargs["replicas"],
)