in cdsresponder/rabbitmq/K8MessageProcessor.py [0:0]
def valid_message_receive(self, channel: pika.spec.Channel, exchange_name, routing_key, delivery_tag, body):
msg = K8Message(body)
logger.debug("Got a {0} message for job {1} ({2}) from exchange {3}".format(routing_key, msg.job_name, msg.job_id, exchange_name))
if routing_key == "cds.job.failed" or routing_key == "cds.job.success":
try:
saved_logs = self.read_logs(msg.job_name, msg.job_namespace)
logger.info("Job {0} terminated, saved {1} pod logs".format(msg.job_name, saved_logs))
except Exception as e:
logger.error("Could not save job logs for {0}: {1}".format(msg.job_name, str(e)), exc_info=e)
if self.should_keep_jobs:
logger.info("Retaining job information {0} in cluster as KEEP_JOBS is set to 'true' or 'yes'. Remove it or set to 'no' in order to remove completed jobs.")
else:
logger.info("Removing completed job {0}...".format(msg.job_name))
self.safe_delete_job(msg.job_name, msg.job_namespace)
else:
logger.info("Job {0} is in progress".format(msg.job_name))