cdsreaper/cdsreaper.py (54 lines of code) (raw):
#!/usr/bin/env python
import kubernetes
import os
import logging
from jobwatcher import JobWatcher
import sys
from messagesender import MessageSender#
from journal import Journal
import pika
logging.basicConfig(format="{asctime} {name}|{funcName} [{levelname}] {message}",level=logging.DEBUG,style='{')
pikaLogger = logging.getLogger("pika")
pikaLogger.level=logging.WARN
k8logger = logging.getLogger("kubernetes")
k8logger.level = logging.WARN
logger = logging.getLogger(__name__)
def init_k8s_client():
try:
kubernetes.config.load_incluster_config()
except kubernetes.config.config_exception.ConfigException as e:
kube_config_file = os.getenv("KUBE_CONFIG", os.path.join(os.getenv("HOME"), ".kube", "config"))
logger.warning("Could not load in-cluster configuration: {0}. Trying external connection from {1}...".format(str(e), kube_config_file))
kubernetes.config.load_kube_config(kube_config_file)
def get_current_namespace():
try:
with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace") as f:
return f.read()
except IOError as e:
logger.debug("Could not open namespace secret file: {0}".format(e))
return None
if __name__ == "__main__":
init_k8s_client()
namespace = get_current_namespace()
if namespace is None:
namespace = os.getenv("NAMESPACE")
if namespace is None:
logger.error("Could not determine namespace from inside cluster, and NAMESPACE was not set in the environment")
sys.exit(1)
logger.info("CDSReaper started up, namespace is '{0}'".format(namespace))
rmq_setup = pika.connection.ConnectionParameters(
host=os.environ.get("RABBITMQ_HOST"),
port=int(os.environ.get("RABBITMQ_PORT", 5672)),
virtual_host=os.environ.get("RABBITMQ_VHOST", "/"),
credentials=pika.PlainCredentials(username=os.environ.get("RABBITMQ_USER"), password=os.environ.get("RABBITMQ_PASSWD")),
connection_attempts=int(os.environ.get("RABBITMQ_CONNECTION_ATTEMPTS", 3)),
retry_delay=int(os.environ.get("RABBITMQ_RETRY_DELAY", 3))
)
sender = MessageSender(rmq_setup, os.environ.get("MY_EXCHANGE", "cdsresponder"))
#prefer to crash if we can't connect at startup, this makes it obvious to monitoring that we are not running yet.
#once we are up and running, retry more, in order to try and stay up.
journal = Journal(os.getenv("REDIS_HOST"),
int(os.getenv("REDIS_PORT",6379)),
int(os.getenv("REDIS_DB_NUM", 0)),
os.getenv("REDIS_PASS"),
max_retries=1)
journal.max_retries = 10
job_watcher = JobWatcher(kubernetes.client.BatchV1Api(), sender, journal, namespace)
job_watcher.run_sync()