nuvolaris/kube.py (99 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# this module wraps kubectl
import nuvolaris.testutil as tu
import nuvolaris.template as tpl
import subprocess
import json
import logging
import yaml
output = ""
error = ""
returncode = -1
dry_run = False
mocker = tu.MockKube()
# execute kubectl commands
# default namespace is nuvolaris, you can change with keyword arg namespace
# default output is text
# if you specify jsonpath it will filter and parse the json output
# returns exceptions if errors
def kubectl(*args, namespace="nuvolaris", input=None, jsonpath=None, debugresult=True):
# support for mocked requests
mres = mocker.invoke(*args)
if mres:
mocker.save(input)
return mres
cmd = namespace and ["kubectl", "-n", namespace] or ["kubectl"]
cmd += list(args)
if jsonpath:
cmd += ["-o", "jsonpath-as-json=%s" % jsonpath]
# if is a string, convert input in bytes
try: input = input.encode('utf-8')
except: pass
# executing
logging.debug(cmd)
res = subprocess.run(cmd, capture_output=True, input=input)
global returncode, output, error
returncode = res.returncode
output = res.stdout.decode()
error = res.stderr.decode()
if res.returncode == 0:
if jsonpath:
try:
parsed = json.loads(output)
if debugresult:
logging.debug("result: %s", json.dumps(parsed, indent=2))
return parsed
except Exception as e:
logging.info(output)
logging.info(e)
return e
else:
return output
logging.info(f"Error: kubectl f{cmd} input='{input}' output='{output}' error='{error}'")
raise Exception(error)
# create a configmap from keyword arguments
def configMap(name, **kwargs):
"""
>>> import nuvolaris.kube as kube, nuvolaris.testutil as tu
>>> tu.grep(kube.configMap("hello", value="world"), "kind:|name:|value:", sort=True)
kind: ConfigMap
name: hello
value: world
>>> tu.grep(kube.configMap("hello", **{"file.js":"function", "file.py": "def"}), "file.", sort=True)
file.js: function
file.py: def
"""
out = yaml.safe_load("""apiVersion: v1
kind: ConfigMap
metadata:
name: %s
data: {}
"""% name)
for key, value in kwargs.items():
out['data'][key] = value
return yaml.dump(out)
# delete an object
def delete(obj, namespace="nuvolaris"):
# tested with apply
if not isinstance(obj, str):
obj = json.dumps(obj)
return kubectl("delete", "-f", "-", namespace=namespace, input=obj)
# shortcut
def ctl(arg, jsonpath='{@}', flatten=False):
import flatdict, json
data = kubectl(*arg.split(), jsonpath=jsonpath)
if flatten:
return dict(flatdict.FlatterDict(data, delimiter="."))
return data
# apply an object
def apply(obj, namespace="nuvolaris"):
if not isinstance(obj, str):
obj = json.dumps(obj)
return kubectl("apply", "-f", "-", namespace=namespace, input=obj)
# apply an expanded template
def applyTemplate(name, data, namespace="nuvolaris"):
obj = tpl.expand_template(name, data)
return kubectl("apply", "-f", "-", namespace=namespace, input=obj)
# delete an expanded template
def deleteTemplate(name, data, namespace="nuvolaris"):
obj = tpl.expand_template(name, data)
return kubectl("delete", "-f", "-", namespace=namespace, input=obj)
def get(name, namespace="nuvolaris"):
try:
return json.loads(kubectl("get", name, "-ojson", namespace=namespace))
except:
return None
def get_pods(selector, namespace="nuvolaris"):
"""
filter the existing pods using the given selector expression. (ex name=mongodb-kubernetes-operator)
"""
try:
return json.loads(kubectl("get", "pods", f"--selector={selector}","-ojson",namespace=namespace))
except:
return None
def wait(name, condition, timeout="600s", namespace="nuvolaris"):
try:
return kubectl("wait", name, f"--for={condition}", f"--timeout={timeout}",namespace=namespace)
except:
return None
# patch an object
def patch(name, data, namespace="nuvolaris", tpe="merge"):
if not type(data) == str:
data = json.dumps(data)
res = kubectl("patch", name, "--type", tpe, "-p", data)
return res
def scale_sts(name, replicas, namespace="nuvolaris"):
try:
return kubectl("scale", name, f"--replicas={replicas}" ,namespace=namespace)
except:
return None
# rollout the specified element. Normally used for DeamonSet or StatefulSet
def rollout(name, namespace="nuvolaris"):
try:
return kubectl("rollout", "restart", name, namespace=namespace)
except:
return None