nuvolaris/testutil.py (182 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import yaml
import re
import flatdict
import json
import time
import os
import requests as req
from subprocess import run
from urllib.parse import urlparse
import uuid
import string
import random
import logging
# takes a string, split in lines and search for the word (a re)
# if field is a number, splits the line in fields separated by spaces and print the selected field
# the output is always space trimmed for easier check
def grep(input, word, field=None, sort=False):
r"""
>>> import nuvolaris.testutil as tu
>>> tu.grep("a\nb\nc\n", "b")
b
>>> tu.grep(b"a\nb\n c\n", r"a|c")
a
c
>>> tu.grep(b"z\nt\n w\n", r"w|z", sort=True)
w
z
"""
try: input = input.decode()
except: pass
lines = []
for line in str(input).split("\n"):
if re.search(word, line):
line = line.strip()
if not field is None:
try:
line = line.split()[field]
except:
line = "missing-field"
lines.append(line)
if sort:
lines.sort()
res = "\n".join(lines)
print(res)
# print a file
def cat(file):
with open(file, "r") as f:
print(f.read())
# print a file
def fread(file):
with open(file, "r") as f:
return f.read()
# capture and print an exception with its type
# or just print the output of the fuction
def catch(f):
"""
>>> import nuvolaris.testutil as tu
>>> tu.catch(lambda: "ok")
ok
>>> def error():
... raise Exception("error")
>>> tu.catch(error)
<class 'Exception'> error
"""
try: print(f().strip())
except Exception as e:
print(type(e), str(e).strip())
# print not blank lines only
def nprint(out):
for line in out.split("\n"):
if line.strip() != "":
print(line)
# print in yaml an obj
def yprint(obj):
print(yaml.dump(obj))
# load an YAML file
def load_yaml(file):
f = open(file)
l = list(yaml.load_all(f, yaml.Loader))
if len(l) > 0:
return l[0]
return {}
# mocking and spying kube support
class MockKube:
"""
>>> from nuvolaris.testutil import *
>>> m = MockKube()
>>> m.invoke()
>>> m.config("", "ok")
>>> m.invoke()
'ok'
>>> m = MockKube()
>>> m.config("apply", "applied")
>>> m.invoke()
>>> m.echo()
>>> m.invoke("apply", "-f")
kubectl apply -f
'applied'
>>> m.peek()
'apply -f'
>>> m.dump()
''
>>> m.save("hello")
>>> m.dump()
'hello'
"""
def __init__(self):
self.reset()
def reset(self):
self.map = {}
self.queue = []
self.saved = []
self.echoFlag = False
self.enabled = False
def echo(self, flag=True):
self.echoFlag = flag
def peek(self, index=-1):
res = self.queue[index][0]
return res
def dump(self, index=-1):
return self.queue[index][1]
def save(self, data, index=-1):
self.queue[index] = (self.queue[index][0], data)
def config(self, request, response):
self.enabled = True
self.map[request] = response
def invoke(self, *args):
if self.enabled:
cmd = " ".join(args)
for key in list(self.map.keys()):
if cmd.startswith(key):
if self.echoFlag:
print("kubectl", cmd)
self.queue.append( (cmd,"") )
return self.map[key]
return None
def load_sample_config(name="whisk"):
with open(f"tests/{name}.yaml") as f:
c = yaml.safe_load(f)
return c['spec']
# read environment variables from Dockerfile, .env and git config
def load_image_env():
# operator images defaults in Dockerfile, can be overriden in .env
r = run('grep "ARG OPERATOR_IMAGE_DEFAULT=" Dockerfile', shell=True, capture_output=True)
opimg = r.stdout.strip().decode("ascii").split("=")[-1]
r = run('grep MY_OPERATOR_IMAGE .env', shell=True, capture_output=True)
if r.returncode == 0:
opimg = r.stdout.strip().decode("ascii").split("=")[-1]
g = run('grep GITHUB_USER= .env', shell=True, capture_output=True)
if g.returncode == 0 and opimg:
ghuser = g.stdout.strip().decode("ascii").split("=")[-1]
opimg = opimg.replace("${GITHUB_USER}",ghuser)
# gives precedence to env variable if set
os.environ["OPERATOR_IMAGE"] = os.getenv("MY_OPERATOR_IMAGE", opimg)
# operator tag is the git tag of the operator
r = run('git describe --tags --abbrev=0 2>/dev/null || git rev-parse --short HEAD', shell=True, capture_output=True)
tag = r.stdout.strip().decode("ascii")
os.environ["OPERATOR_TAG"] = tag
# controller images and tag are in the Dockerfile
r = run("grep -Po '(?<=CONTROLLER_IMAGE=).*' Dockerfile", shell=True, capture_output=True)
os.environ["CONTROLLER_IMAGE"] = r.stdout.decode("ascii").strip()
r = run("grep -Po '(?<=CONTROLLER_TAG=).*' Dockerfile", shell=True, capture_output=True)
os.environ["CONTROLLER_TAG"] = r.stdout.decode("ascii").strip()
def set_apihost_from_kubeconfig(cfg):
# assume for k3s and microk8s,
# the nuvolaris apihost is the same as the kube api server
r = run("kubectl config view -o json | jq -r '.clusters[0].cluster.server'", shell=True, capture_output=True)
server = r.stdout.decode("ascii").strip()
hostname = urlparse(server).hostname
kube = cfg.get("nuvolaris.kube")
if kube is None:
return
if kube in ["k3s", "microk8s"]:
cfg.put("nuvolaris.apihost", hostname)
if kube == "openshift":
hostname1 = ".".join(["nuvolaris", "apps"] + hostname.split(".")[1:])
cfg.put("nuvolaris.apihost", hostname1)
def json2flatdict(data):
return dict(flatdict.FlatterDict(json.loads(data), delimiter="."))
def get_by_key_sub(dic, key):
res = []
for k in list(dic.keys()):
try:
k.index(key)
res.append(dic[k])
except:
pass
return "\n".join(res)
def read_dotenv():
import os
try:
f = open(".env")
lines = f.readlines()
#print(lines)
for line in lines:
#print(line)
#line = lines[1]
a = line.split("=", 1)
if len(a) == 2:
print(a[0])
os.environ[a[0]] = a[1].strip()
f.close()
except Exception as e:
print(e)
print(".env not found")
pass
def get_with_retry(url, max_seconds):
start = time.time()
delta = 0
while delta < max_seconds:
try:
r = req.get(url, timeout=1)
if r.status_code == 200:
return r.text
except Exception as e:
print(e)
print(f"waiting since: {delta} seconds")
delta = int(time.time() - start)
time.sleep(1)
return ""
# retry a function until it returns a given value
# return true when the value is what is expected, false otherwise
def retry(fn, value, max=10, delay=1):
for i in range(0, max):
if fn() == value:
return True
time.sleep(delay)
print(i, "retrying...")
return False
def generate_ow_uid():
"""
>>> import nuvolaris.testutil as util
>>> len(util.generate_ow_uid())
36
"""
return str(uuid.uuid4())
def generate_ow_key():
"""
>>> import nuvolaris.testutil as util
>>> len(util.generate_ow_key())
64
"""
return ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(64))
def generate_ow_auth():
"""
>>> import nuvolaris.testutil as util
>>> len(util.generate_ow_auth())
101
"""
uid = generate_ow_uid()
key = generate_ow_key()
return f"{uid}:{key}"
def load_sample_user_config(name="whisk-user"):
with open(f"tests/{name}.yaml") as f:
c = yaml.safe_load(f)
return c['spec']
def load_sample_runtimes(name="runtimes"):
with open(f"tests/{name}.json") as f:
return json.load(f)
def enable_debug_logging():
logging.basicConfig(level=logging.DEBUG)