in cdsreaper/jobwatcher.py [0:0]
def _watcher(self):
"""
internal method, forming the job watcher loop. Does not return.
:return:
"""
watcher = watch.Watch()
resource_version = self._journal.get_most_recent_event()
if resource_version is None:
logger.info("Could not find a journalled resource version to start watch at, starting from most recent")
initial_status:V1JobList = self._batchv1.list_namespaced_job(self._namespace)
resource_version = initial_status.metadata.resource_version
logger.info("Initiating job watch at resource version {0}".format(resource_version))
try:
for event in watcher.stream(self._batchv1.list_namespaced_job,
self._namespace,
resource_version=resource_version):
logger.debug("Received job event: {0}".format(event['type']))
if isinstance(event["object"], V1Job):
if not event["object"].metadata.name.startswith("cds-"):
logger.info("Job {0} is not a cds job, ignoring".format(event["object"].metadata.name))
continue
if event["type"]=="DELETED":
# we are not interested in the job object being deleted,
# it will have already been registered as succeeded/failed at this point.
continue
self.check_job(event["object"])
self._journal.record_processed(event["object"].metadata.resource_version)
else:
logger.warning("received notification with unexpected type {0}".format(type(event["object"])))
except kubernetes.client.exceptions.ApiException as err:
logger.warning("Could not watch resource: cluster said {0}".format(str(err)))
if err.status==410:
logger.warning("Restarting from the most recent event")
self._journal.clear_journal()
return self._watcher()
else:
logger.error("Can't recover from {0} error".format(err.status))
raise