cdsreaper/jobwatcher.py (108 lines of code) (raw):

from kubernetes import client, config, watch import kubernetes.client.exceptions from kubernetes.client.models.v1_job import V1Job from kubernetes.client.models.v1_job_status import V1JobStatus from kubernetes.client.models.v1_job_condition import V1JobCondition from kubernetes.client.models.v1_job_list import V1JobList from messagesender import MessageSender from journal import Journal import sys from models import * logger = logging.getLogger(__name__) class JobWatcher(object): def __init__(self, api_client: client.BatchV1Api, sender: MessageSender, journal: Journal, namespace: str): self._batchv1 = api_client self._namespace = namespace self._sender = sender self._journal = journal @staticmethod def job_is_starting(s: V1JobStatus)->bool: """ returns true if the given job is just starting up, i.e. it has a blank start_time, succeeded and failed counts :param s: V1JobStatus object to be interrogated :return: boolean """ return s.start_time is None and s.active is None and s.failed is None and s.succeeded is None @staticmethod def job_is_running(s: V1JobStatus)->bool: """ returns true if the given job has more than one active pod, i.e. it is running now :param s: V1JobStatus object to be interrogated :return: boolean """ return s.active is not None and s.active>0 and (s.failed is None or s.failed==0) @staticmethod def job_is_retry(s: V1JobStatus)->bool: """ returns true if the given job is retrying, i.e. it's active and has at least one failure :param s: :return: """ return s.active is not None and s.active>0 and s.failed>0 @staticmethod def job_is_failed(s:V1JobStatus)->bool: """ returns true if the given job is NOT running and it has no successful completions :param s: V1JobStatus object to be interrogated :return: boolean """ return (s.active is None or s.active==0) and (s.succeeded is None or s.succeeded==0) and s.start_time is not None @staticmethod def job_is_success(s:V1JobStatus)->bool: """ returns true if the given job is NOT running and it has successful completions :param s: V1JobStatus object to be interrogated :return: boolean """ return (s.active is None or s.active==0) and (s.succeeded is not None and s.succeeded>0) @staticmethod def get_job_status_string(j:V1Job)->str: logger.debug("Current job status dump: {0}".format(j.status)) if JobWatcher.job_is_running(j.status): return "running" elif JobWatcher.job_is_retry(j.status): return "retry" elif JobWatcher.job_is_starting(j.status): return "starting" elif JobWatcher.job_is_failed(j.status): return "failed" elif JobWatcher.job_is_success(j.status): return "success" @staticmethod def get_most_recent_condition(conditions:list) -> V1JobCondition: if len(conditions)==0: return None sorted_conditions = sorted(conditions, key=lambda c: c.last_probe_time, reverse=True) return sorted_conditions[0] @staticmethod def get_job_failure_reason(s: V1JobStatus) -> str: maybe_cond = JobWatcher.get_most_recent_condition(s.conditions) if maybe_cond is None: return "Unknown" return "{0} - {1}".format(maybe_cond.reason, maybe_cond.message) def check_job(self, j:V1Job): status = self.get_job_status_string(j) logger.info("Job {0} ({1}) is in status {2}".format(j.metadata.name, j.metadata.uid, status)) routing_key = "cds.job.{0}".format(status) message_body = { "job-id": j.metadata.uid, "job-name": j.metadata.name, "job-namespace": j.metadata.namespace, "retry-count": j.status.failed if j.status.failed is not None else 0 } if status=="failed": message_body["failure-reason"] = JobWatcher.get_job_failure_reason(j.status) return self._sender.notify(routing_key, message_body) def _watcher(self): """ internal method, forming the job watcher loop. Does not return. :return: """ watcher = watch.Watch() resource_version = self._journal.get_most_recent_event() if resource_version is None: logger.info("Could not find a journalled resource version to start watch at, starting from most recent") initial_status:V1JobList = self._batchv1.list_namespaced_job(self._namespace) resource_version = initial_status.metadata.resource_version logger.info("Initiating job watch at resource version {0}".format(resource_version)) try: for event in watcher.stream(self._batchv1.list_namespaced_job, self._namespace, resource_version=resource_version): logger.debug("Received job event: {0}".format(event['type'])) if isinstance(event["object"], V1Job): if not event["object"].metadata.name.startswith("cds-"): logger.info("Job {0} is not a cds job, ignoring".format(event["object"].metadata.name)) continue if event["type"]=="DELETED": # we are not interested in the job object being deleted, # it will have already been registered as succeeded/failed at this point. continue self.check_job(event["object"]) self._journal.record_processed(event["object"].metadata.resource_version) else: logger.warning("received notification with unexpected type {0}".format(type(event["object"]))) except kubernetes.client.exceptions.ApiException as err: logger.warning("Could not watch resource: cluster said {0}".format(str(err))) if err.status==410: logger.warning("Restarting from the most recent event") self._journal.clear_journal() return self._watcher() else: logger.error("Can't recover from {0} error".format(err.status)) raise def run_sync(self): """ runs the job watcher synchronously. Does not return. :return: """ try: self._watcher() except Exception as e: logger.exception("Could not run the watcher: {0}".format(e)) sys.exit(2)