cdsresponder/cds/cds_launcher.py (86 lines of code) (raw):
from kubernetes import client, config
import kubernetes.client.models
import logging
from hikaru import load_full_yaml, Job, get_clean_dict
import pathlib
import os
import re
from k8s.k8utils import get_current_namespace
logger = logging.getLogger(__name__)
class NotInCluster(Exception):
pass
class CDSLauncher(object):
def __init__(self, namespace:str):
try:
config.load_incluster_config()
except config.config_exception.ConfigException as e:
kube_config_file = os.getenv("KUBE_CONFIG", os.path.join(os.getenv("HOME"), ".kube", "config"))
logger.warning("Could not load in-cluster configuration: {0}. Trying external connection from {1}...".format(str(e), kube_config_file))
config.load_kube_config(kube_config_file)
self.batch = client.BatchV1Api()
self.namespace = get_current_namespace()
if self.namespace is None and namespace is not None:
logger.info("Not running in cluster, falling back to configured namespace {0}", namespace)
self.namespace = namespace
elif self.namespace is None and namespace is None:
logger.error("If we are not running in a cluster you must specify a namespace within which to start jobs")
raise ValueError("No namespace configured")
logger.info("Startup - we are in namespace {0}".format(self.namespace))
def find_job_template(self):
filepath = os.path.join(os.getenv("TEMPLATES_PATH"), "cdsjob.yaml")
if os.path.exists(filepath):
return filepath
filepath = os.path.join(pathlib.Path(__name__).parent.absolute(), "templates","cdsjob.yaml")
if os.path.exists(filepath):
return filepath
filepath = "/etc/cdsresponder/templates/cdsjob.yaml"
if os.path.exists(filepath):
return filepath
raise RuntimeError("No path to cdsjob could be found")
def load_job_template(self):
filepath = self.find_job_template()
logger.debug("Loading job template from {0}".format(filepath))
loaded = load_full_yaml(filepath)
if len(loaded)==0:
raise ValueError("Nothing was defined in cdsjob.yaml")
jobs = [x for x in loaded if isinstance(x, Job)]
if len(jobs)==0:
raise ValueError("Of {0} objects defined in cdsjob.yaml, none of them was a Job".format(len(loaded)))
return jobs[0]
def build_job_doc(self, job_name:str, cmd:list, labels:dict):
content_template = self.load_job_template()
if not isinstance(content_template, Job):
raise TypeError("cdsjob template must be for a Job, we got a {0}!".format(content_template.__class__.__name__))
content_template.metadata.name = self.sanitise_job_name(job_name)
content_template.spec.template.spec.containers[0].command = cmd
existing_labels = content_template.metadata.labels
if existing_labels is None:
existing_labels = {}
existing_labels.update(labels)
content_template.metadata.labels = existing_labels
return get_clean_dict(content_template)
@staticmethod
def sanitise_job_name(job_name:str) -> str:
"""
converts the incoming string into a form that is suitable for a Kubernetes job name
:param job_name: string to convert
:return:
"""
first_sub = re.sub(r"\s+","-", job_name)
second_sub = re.sub(r'[^A-Za-z0-9-]', "", first_sub).lower()
if len(second_sub) <= 59: #kubernetes name length is 63 chars, we prepend "cds-" so take that off
truncated = second_sub
else:
truncated = second_sub[0:59]
third_sub = re.sub(r'[^a-z0-9]+$', "", truncated)
fourth_sub = re.sub(r'^[^a-z0-9]+', "", third_sub)
return fourth_sub
def launch_cds_job(self, inmeta_path: str, job_name: str, route_name: str, labels:dict) -> kubernetes.client.models.V1Job:
command_parts = [
"/usr/local/bin/cds_run.pl",
"--input-inmeta",
inmeta_path,
"--route",
route_name
]
jobdoc = self.build_job_doc(job_name, command_parts, labels)
logger.debug("Built job doc for submission: {0}".format(jobdoc))
return self.batch.create_namespaced_job(
body=jobdoc,
namespace=self.namespace
)