eks_infrastructure/eks_cleanup/eks_cleanup_job.py (63 lines of code) (raw):
from kubernetes import client, config
from datetime import datetime
from invoke import run
import pytz
import boto3
import logging
import sys
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
LOGGER.addHandler(logging.StreamHandler(sys.stdout))
JOB_TIMEOUT = 3
AWS_REGION = "us-west-2"
EKS_CLUSTERS = [
"mxnet-PR",
"pytorch-PR",
"tensorflow-PR",
"mxnet-MAINLINE",
"pytorch-MAINLINE",
"tensorflow-MAINLINE",
]
EKS_CLUSTER_MANAGER_ROLE_NAME = "clusterManagerRole"
def get_run_time(creation_time):
"""
Get the time difference between resource creation time and current time in hours
"""
current_time = datetime.utcnow().replace(tzinfo=pytz.UTC)
diff = current_time - creation_time
days, seconds = diff.days, diff.seconds
hours = days * 24 + seconds // 3600
return hours
def delete_resources(list_item, k8s_api, job_type, namespace):
"""
Check the uptime for each resouce and delete if the uptime is greater than 3 hours
"""
for item in list_item.items:
item_name = item.metadata.name
item_creation_time = item.metadata.creation_timestamp
LOGGER.info(f"Resource name {item_name}")
LOGGER.info(f"Resource creation time {item_creation_time}")
# Do not delete the kubeflow mxnet operator as it is a system resource and exists in default namespace
if "mxnet-operator" in item_name:
continue
hours = get_run_time(item_creation_time)
LOGGER.info(f"Resource {item_name} up time in hours: {hours}")
if hours >= JOB_TIMEOUT:
LOGGER.info(f"Deleting resource {item_name}")
if job_type == "deployment":
k8s_api.delete_namespaced_deployment(item_name, namespace)
if job_type == "pod":
k8s_api.delete_namespaced_pod(item_name, namespace)
def run_cleanup_job():
"""
List current deployments and pod and check if they are eligible for cleanup
"""
core_v1_api = client.CoreV1Api()
apps_v1_api = client.AppsV1Api()
list_deployment_default = apps_v1_api.list_namespaced_deployment(namespace="default")
list_pod_default = core_v1_api.list_namespaced_pod(namespace="default")
delete_resources(list_deployment_default, apps_v1_api, "deployment", "default")
delete_resources(list_pod_default, core_v1_api, "pod", "default")
def main():
sts_client = boto3.client("sts")
account_id = sts_client.get_caller_identity().get("Account")
EKS_CLUSTER_MANAGER_ROLE = f"arn:aws:iam::{account_id}:role/{EKS_CLUSTER_MANAGER_ROLE_NAME}"
# Loop through each EKS cluster and perform cleanup
for cluster in EKS_CLUSTERS:
# Login into the cluster
run(
f"eksctl utils write-kubeconfig --cluster {cluster} --authenticator-role-arn {EKS_CLUSTER_MANAGER_ROLE} --region {AWS_REGION}"
)
config.load_kube_config()
_, active_context = config.list_kube_config_contexts()
LOGGER.info(f"Current EKS cluster {active_context['name']}")
run_cleanup_job()
main()