cdsresponder/rabbitmq/K8MessageProcessor.py (111 lines of code) (raw):
from .messageprocessor import MessageProcessor
import pika
from typing import Optional, List
import logging
from kubernetes import client, config
from kubernetes.client.models.v1_pod import V1Pod
from kubernetes.client.models.v1_pod_list import V1PodList
import os
import k8s.k8utils
import pathlib
logger = logging.getLogger(__name__)
class K8Message(object):
schema = {
"type": "object",
"properties": {
"job-id": {"type": "string"},
"job-name": {"type": "string"},
"job-namespace": {"type": "string"},
"retry-count": {"type": "number"},
"failure-reason": {"type": "string"}
},
"required": ["job-id","job-name","job-namespace"]
}
def __init__(self, source:dict):
self._content = source
@property
def job_id(self)->str:
return self._content["job-id"]
@property
def job_name(self)->str:
return self._content["job-name"]
@property
def job_namespace(self)->str:
return self._content["job-namespace"]
@property
def retry_count(self)->Optional[int]:
value = self._content["retry-count"]
if value is not None:
try:
return int(value)
except ValueError:
logger.warning("invalid retry count data '{0}' is not a number".format(value))
return None
else:
return None
@property
def failure_reason(self)->Optional[str]:
return self._content["failure-reason"]
class K8MessageProcessor(MessageProcessor):
schema = K8Message.schema
routing_key = "cds.job.*"
pod_log_basepath = os.getenv("POD_LOGS_BASEPATH") #if this is not set then no pod logs will be written
@staticmethod
def get_should_keep_jobs():
value = os.getenv("KEEP_JOBS")
if value is None or value.lower()=="false" or value.lower()=="no":
return False
elif value.lower()=="true" or value.lower()=="yes":
return True
else:
raise ValueError("You must set KEEP_JOBS to either 'yes' or 'no'. Remember to quote these strings in a yaml document.")
def __init__(self, namespace:str):
try:
config.load_incluster_config()
except config.config_exception.ConfigException as e:
kube_config_file = os.getenv("KUBE_CONFIG", os.path.join(os.getenv("HOME"), ".kube", "config"))
logger.warning("Could not load in-cluster configuration: {0}. Trying external connection from {1}...".format(str(e), kube_config_file))
config.load_kube_config(kube_config_file)
self.should_keep_jobs = self.get_should_keep_jobs()
self.batch = client.BatchV1Api()
self.k8core = client.CoreV1Api()
self.namespace = k8s.k8utils.get_current_namespace()
if self.namespace is None and namespace is not None:
logger.info("Not running in cluster, falling back to configured namespace {0}", namespace)
self.namespace = namespace
elif self.namespace is None and namespace is None:
logger.error("If we are not running in a cluster you must specify a namespace within which to start jobs")
raise ValueError("No namespace configured")
logger.info("Startup - we are in namespace {0}".format(self.namespace))
def read_logs(self, job_name:str, job_namespace:str)->int:
if self.pod_log_basepath is None:
logger.warning("If you want pod logs to be saved, then you must set POD_LOGS_BASEPATH to a valid writable filepath")
return 0
pod_list:V1PodList = self.k8core.list_namespaced_pod(job_namespace, label_selector="job-name={0}".format(job_name))
# ensure path exists
destpath = os.path.join(self.pod_log_basepath, job_name)
pathlib.Path(destpath).mkdir(parents=True, exist_ok=True)
for pod in pod_list.items:
filename = os.path.join(self.pod_log_basepath, job_name, pod.metadata.name + ".log")
k8s.k8utils.dump_pod_logs(pod.metadata.name, pod.metadata.namespace, filename)
return len(pod_list.items)
def safe_delete_job(self, job_name:str, job_namespace:str):
try:
self.batch.delete_namespaced_job(job_name, job_namespace, propagation_policy='Foreground')
except Exception as e:
logger.error("Could not remove the job {0} from namespace {1}: {2}".format(job_name, job_namespace, str(e)))
def valid_message_receive(self, channel: pika.spec.Channel, exchange_name, routing_key, delivery_tag, body):
msg = K8Message(body)
logger.debug("Got a {0} message for job {1} ({2}) from exchange {3}".format(routing_key, msg.job_name, msg.job_id, exchange_name))
if routing_key == "cds.job.failed" or routing_key == "cds.job.success":
try:
saved_logs = self.read_logs(msg.job_name, msg.job_namespace)
logger.info("Job {0} terminated, saved {1} pod logs".format(msg.job_name, saved_logs))
except Exception as e:
logger.error("Could not save job logs for {0}: {1}".format(msg.job_name, str(e)), exc_info=e)
if self.should_keep_jobs:
logger.info("Retaining job information {0} in cluster as KEEP_JOBS is set to 'true' or 'yes'. Remove it or set to 'no' in order to remove completed jobs.")
else:
logger.info("Removing completed job {0}...".format(msg.job_name))
self.safe_delete_job(msg.job_name, msg.job_namespace)
else:
logger.info("Job {0} is in progress".format(msg.job_name))