nuvolaris/couchdb.py (207 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import kopf, os, logging, json
import nuvolaris.kustomize as kus
import nuvolaris.kube as kube
import nuvolaris.couchdb_util as cu
import nuvolaris.config as cfg
import nuvolaris.couchdb_util
import nuvolaris.util as util
from nuvolaris.user_config import UserConfig
from nuvolaris.user_metadata import UserMetadata
from jinja2 import Environment, FileSystemLoader
loader = FileSystemLoader(["./nuvolaris/templates", "./nuvolaris/files"])
env = Environment(loader=loader)
def update_templated_doc(db, database, template, data):
tpl = env.get_template(template)
doc = json.loads(tpl.render(data))
return db.update_doc(database, doc)
def create(owner=None):
logging.info("create couchdb")
runtime = cfg.get('nuvolaris.kube')
u = cfg.get('couchdb.admin.user', "COUCHDB_ADMIN_USER", "whisk_admin")
p = cfg.get('couchdb.admin.password', "COUCHDB_ADMIN_PASSWORD", "some_passw0rd")
user = f"db_username={u}"
pasw = f"db_password={p}"
img = cfg.get('operator.image') or "missing-operator-image"
tag = cfg.get('operator.tag') or "missing-operator-tag"
image = f"{img}:{tag}"
container_image = runtime in ['openshift'] and "ghcr.io/nuvolaris/couchdb:2.3.1-nuvolaris.23101915" or "apache/couchdb:2.3"
config = json.dumps(cfg.getall())
data = {
"runtime":runtime,
"container_image":container_image,
"image": image,
"config": config,
"name": "couchdb",
"container": "couchdb",
"size": cfg.get("couchdb.volume-size", "COUCHDB_VOLUME_SIZE", 10),
"dir": "/opt/couchdb/data",
"storageClass": cfg.get("nuvolaris.storageclass"),
"container_cpu_req": cfg.get('configs.couchdb.resources.cpu-req') or "500m",
"container_cpu_lim": cfg.get('configs.couchdb.resources.cpu-lim') or "1",
"container_mem_req": cfg.get('configs.couchdb.resources.mem-req') or "1G",
"container_mem_lim": cfg.get('configs.couchdb.resources.mem-lim') or "2G",
"container_manage_resources": cfg.exists('configs.couchdb.resources.cpu-req'),
"index": "1",
"replicationRole":"primary",
"appName":"nuvolaris-couchdb"
}
tplp = ["set-attach.yaml"]
util.couch_affinity_tolerations_data(data)
if(data['affinity'] or data['tolerations']):
tplp.append("affinity-tolerance-sts-core-attach.yaml")
kus.processTemplate("couchdb","couchdb-set-tpl.yaml",data,"couchdb-set_generated.yaml")
kust = kus.secretLiteral("couchdb-auth", user, pasw)
kust += kus.patchTemplates("couchdb",tplp,data)
spec = kus.restricted_kustom_list("couchdb", kust, templates=["couchdb-init.yaml"],templates_filter=["couchdb-set_generated.yaml","couchdb-svc.yaml"],data=data)
if owner:
kopf.append_owner_reference(spec['items'], owner)
else:
cfg.put("state.couchdb.spec", spec)
res = kube.apply(spec)
if res:
# dynamically detect couchdb pod and wait for readiness
util.wait_for_pod_ready("{.items[?(@.metadata.labels.name == 'couchdb')].metadata.name}")
return res
def delete():
spec = cfg.get("state.couchdb.spec")
res = False
if spec:
res = kube.delete(spec)
logging.info(f"delete couchdb: {res}")
return res
def check(f, what, res):
if f:
logging.info(f"OK: {what}")
return res and True
else:
logging.warn(f"ERR: {what}")
return False
def init_system(db):
res = check(db.wait_db_ready(60), "wait_db_ready", True)
res = check(db.configure_single_node(), "configure_single_node", res)
res = check(db.configure_no_reduce_limit(), "configure_no_reduce_limit", res)
cuser = cfg.get('couchdb.controller.user', "COUCHDB_CONTROLLER_USER", "controller_admin")
cpasw = cfg.get('couchdb.controller.password', "COUCHDB_CONTROLLER_PASSWORD", "s0meP@ass1")
iuser = cfg.get('couchdb.invoker.user', "COUCHDB_INVOKER_USER", "invoker_admin")
ipasw = cfg.get('couchdb.invoker.password', "COUCHDB_INVOKER_PASSWORD", "s0meP@ass2")
res = check(db.add_user(cuser, cpasw), "add_user: controller", res)
return check(db.add_user(iuser, ipasw), "add_user: invoker", res)
def init_subjects(db):
subjects_design_docs = [
"auth_design_document_for_subjects_db_v2.0.0.json",
"filter_design_document.json",
"namespace_throttlings_design_document_for_subjects_db.json"]
dbn = "subjects"
res = check(db.wait_db_ready(60), "wait_db_ready", True)
res = check(db.create_db(dbn), "create_db: subjects", res)
members = [cfg.get('couchdb.controller.user'), cfg.get('couchdb.invoker.user')]
res = check(db.add_role(dbn, members), "add_role: subjects", res)
for i in subjects_design_docs:
res = check(update_templated_doc(db, dbn, i, {}), f"add {i}", res)
return res
def init_activations(db):
activations_design_docs = [
"whisks_design_document_for_activations_db_v2.1.0.json",
"whisks_design_document_for_activations_db_filters_v2.1.1.json",
"filter_design_document.json",
"activations_design_document_for_activations_db.json",
"logCleanup_design_document_for_activations_db.json"
]
dbn = "activations"
res = check(db.wait_db_ready(60), "wait_db_ready", True)
res = check(db.create_db(dbn), "create_db: activations", res)
members = [cfg.get('couchdb.controller.user'), cfg.get('couchdb.invoker.user')]
res = check(db.add_role(dbn, members), "add_role: activations", res)
for i in activations_design_docs:
res = check(update_templated_doc(db, dbn, i, {}), f"add {i}", res)
return res
def init_actions(db):
whisks_design_docs = [
"whisks_design_document_for_entities_db_v2.1.0.json",
"filter_design_document.json"
]
dbn = "whisks"
res = check(db.wait_db_ready(60), "wait_db_ready", True)
res = check(db.create_db(dbn), "create_db: whisks", res)
members = [cfg.get('couchdb.controller.user'), cfg.get('couchdb.invoker.user')]
res = check(db.add_role(dbn, members), "add_role: actions", res)
for i in whisks_design_docs:
res = check(update_templated_doc(db, dbn, i, {}), f"add {i}", res)
return res
def add_initial_subjects(db):
res = check(db.wait_db_ready(60), "wait_db_ready", True)
dbn = "subjects"
for _, (name, value) in enumerate(cfg.getall("openwhisk.namespaces").items()):
[uuid, key] = value.split(":")
basename = name.split(".")[-1]
data = { "name": basename, "key": key, "uuid": uuid}
res = check(update_templated_doc(db, dbn, "subject.json", data), f"add {name}", res)
return res
def init():
# load nuvolaris config from the named crd
config = os.environ.get("NUVOLARIS_CONFIG")
if config:
import logging
logging.basicConfig(level=logging.INFO)
spec = json.loads(config)
cfg.configure(spec)
for k in cfg.getall(): logging.info(f"{k} = {cfg.get(k)}")
# dynamically detect couchdb pod and wait for readiness
util.wait_for_pod_ready("{.items[?(@.metadata.labels.name == 'couchdb')].metadata.name}")
db = nuvolaris.couchdb_util.CouchDB()
res = check(init_system(db), "init_system", True)
res = check(init_subjects(db), "init_subjects", res)
res = check(init_activations(db), "init_activations", res)
res = check(init_actions(db), "init_actions", res)
res = check(add_initial_subjects(db), "add_subjects", res)
res = check(init_users_metadata(db), "init_users_metadata", res)
res = check(init_compactions_config(db), "init_compactions_config", res)
# job process status code should be negated if the job is successfull
return not res
def add_subject(db, namespace, auth):
"""
Add a new Openwhisk Couchdb subject that will be authorized to interact with the specified Openwhisk namespace.
the auth parameters represents the subject authentication in the form of a uuid.uuid4():randomstr(64)
"""
res = check(db.wait_db_ready(60), "wait_db_ready", True)
dbn = "subjects"
[uuid, key] = auth.split(":")
data = { "name": namespace, "key": key, "uuid": uuid}
return check(update_templated_doc(db, dbn, "subject.json", data), f"add {namespace}", res)
def init_users_metadata(db):
"""
Add a new Openwhisk Couchdb database to host nuvolaris user relevant informations
"""
dbn = "users_metadata"
res = check(db.wait_db_ready(60), "wait_db_ready", True)
res = check(db.create_db(dbn), "create_db: user_metadata", res)
return res
def init_compactions_config(db):
"""
Activate the compactions config for the nuvolaris related databases
"""
res = check(db.wait_db_ready(60), "wait_db_ready", True)
res = check(db.enable_db_compaction("users_metadata"), "enable_db_compaction: user_metadata", res)
res = check(db.enable_db_compaction("users_subjects"), "enable_db_compaction: subjects", res)
res = check(db.enable_db_compaction("whisks"), "enable_db_compaction: whisks", res)
return res
def create_ow_user(ucfg: UserConfig, user_metadata: UserMetadata):
subject = ucfg.get("namespace")
auth = ucfg.get("auth")
logging.info(f"authorizing OpenWhisk namespace {subject}")
try:
db = nuvolaris.couchdb_util.CouchDB()
if(util.validate_ow_auth(auth)):
logging.info(f"{subject} authorization is valid, adding subject")
res = add_subject(db, subject, auth)
if(res):
user_metadata.add_metadata("AUTH",auth)
return res
else:
return None
except Exception as e:
logging.error(f"failed to authorize Openwhisk namespace {subject} authorization id and key: {e}")
return None
def delete_ow_user(subject):
logging.info(f"removing auhorization for OpenWhisk namespace {subject}")
try:
db = nuvolaris.couchdb_util.CouchDB()
selector = {"selector":{"subject": {"$eq": subject }}}
response = db.find_doc("subjects", json.dumps(selector))
if(response['docs']):
docs = list(response['docs'])
if(len(docs) > 0):
doc = docs[0]
logging.info(f"removing subjects documents {doc['_id']}")
return db.delete_doc("subjects",doc['_id'])
logging.warn(f"auhorization for OpenWhisk namespace {subject} not found!")
return None
except Exception as e:
logging.error(f"failed to remove Openwhisk namespace {subject} authorization id and key: {e}")
return None