nuvolaris/redis.py (174 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import nuvolaris.kustomize as kus import nuvolaris.kube as kube import nuvolaris.config as cfg import nuvolaris.template as ntp import nuvolaris.util as util import nuvolaris.openwhisk as openwhisk import urllib.parse import os, os.path import logging, json import kopf import nuvolaris.operator_util as operator_util from nuvolaris.user_config import UserConfig from nuvolaris.user_metadata import UserMetadata def _add_redis_user_metadata(ucfg: UserConfig, user_metadata:UserMetadata): """ adds an entry for the redis connectivity, i.e something like "redis://{namespace}:{auth}@redis"} """ try: redis_service = util.get_service("{.items[?(@.spec.selector.name == 'redis')]}") if(redis_service): redis_service_name = redis_service['metadata']['name'] redis_service_port = redis_service['spec']['ports'][0]['port'] username = urllib.parse.quote(ucfg.get('namespace')) password = urllib.parse.quote(ucfg.get('redis.password')) auth = f"{username}:{password}" redis_url = f"redis://{auth}@{redis_service_name}:{redis_service_port}" redis_alt_url = f"redis://{password}@{redis_service_name}:{redis_service_port}" user_metadata.add_metadata("REDIS_URL",redis_url) user_metadata.add_metadata("REDIS_ALT_URL",redis_alt_url) user_metadata.add_metadata("REDIS_SERVICE",redis_service_name) user_metadata.add_metadata("REDIS_PORT",redis_service_port) user_metadata.add_metadata("REDIS_PROVIDER","valkey") user_metadata.add_metadata("REDIS_PASSWORD",ucfg.get('redis.password')) return None except Exception as e: logging.error(f"failed to build redis_url for {ucfg.get('namespace')}: {e}") return None def create(owner=None): logging.info("create redis") data = util.get_redis_config_data() tplp = ["security-set-attach.yaml","redis-set.yaml"] if data['persistence']: tplp.append("set-attach.yaml") if(data['affinity'] or data['tolerations']): tplp.append("affinity-tolerance-sts-core-attach.yaml") kust = kus.patchTemplates("redis",tplp , data) spec = kus.kustom_list("redis", kust, templates=["redis-conf.yaml"], data=data) if owner: kopf.append_owner_reference(spec['items'], owner) else: cfg.put("state.redis.spec", spec) res = kube.apply(spec) wait_for_redis_ready() create_nuvolaris_db_user(data) logging.info(f"create redis: {res}") return res def wait_for_redis_ready(): # dynamically detect redis pod and wait for readiness util.wait_for_pod_ready("{.items[?(@.metadata.labels.name == 'redis')].metadata.name}") def restore_nuvolaris_db_user(): data = util.get_redis_config_data() create_nuvolaris_db_user(data) def create_nuvolaris_db_user(data): logging.info(f"authorizing redis for namespace nuvolaris") try: data['mode']="create" path_to_script = render_redis_script(data['namespace'],"redis_manage_user_tpl.txt",data) pod_name = util.get_pod_name("{.items[?(@.metadata.labels.name == 'redis')].metadata.name}") if(pod_name): res = exec_redis_command(pod_name,path_to_script) if(res): redis_service = util.get_service("{.items[?(@.spec.selector.name == 'redis')]}") if(redis_service): redis_service_name = redis_service['metadata']['name'] redis_service_port = redis_service['spec']['ports'][0]['port'] username = urllib.parse.quote(data['namespace']) password = urllib.parse.quote(data['password']) auth = f"{username}:{password}" redis_url = f"redis://{auth}@{redis_service_name}:{redis_service_port}" redis_alt_url = f"redis://{password}@{redis_service_name}:{redis_service_port}" openwhisk.annotate(f"redis_url={redis_url}") openwhisk.annotate(f"redis_prefix={data['prefix']}") openwhisk.annotate(f"redis_alt_url={redis_alt_url}") openwhisk.annotate(f"redis_service={redis_service_name}") openwhisk.annotate(f"redis_port={redis_service_port}") openwhisk.annotate(f"redis_password={data['password']}") openwhisk.annotate(f"redis_provider=valkey") logging.info("*** saved annotation for redis nuvolaris user") return res return None except Exception as e: logging.error(f"failed to add redis namespace {data['namespace']}: {e}") return None def delete_by_owner(): spec = kus.build("redis") res = kube.delete(spec) logging.info(f"delete redis: {res}") return res def delete_by_spec(): spec = cfg.get("state.redis.spec") res = False if spec: res = kube.delete(spec) logging.info(f"delete redis: {res}") return res def delete(owner=None): if owner: return delete_by_owner() else: return delete_by_spec() def render_redis_script(namespace,template,data): """ uses the given template to render a redis-cli script to be executed. """ out = f"/tmp/__{namespace}_{template}" file = ntp.spool_template(template, out, data) return os.path.abspath(file) def exec_redis_command(pod_name,path_to_script): logging.info(f"passing script {path_to_script} to pod {pod_name}") res = kube.kubectl("cp",path_to_script,f"{pod_name}:{path_to_script}") res = kube.kubectl("exec","-it",pod_name,"--","/bin/bash","-c",f"cat {path_to_script} | redis-cli") os.remove(path_to_script) return res def create_db_user(ucfg: UserConfig, user_metadata: UserMetadata, read_only_mode = False): logging.info(f"authorizing new redis namespace {ucfg.get('namespace')}") try: wait_for_redis_ready() data = util.get_redis_config_data() # if prefix not provided defaults to user namespace prefix = ucfg.get('redis.prefix') or ucfg.get('namespace') if(not prefix.endswith(":")): prefix = f"{prefix}:" data['prefix']=prefix data['namespace']=ucfg.get('namespace') data['password']=ucfg.get('redis.password') data['mode']="create" if read_only_mode: logging.warn(f"activating {prefix} in read-only mode") data['mode']="create_readonly" path_to_script = render_redis_script(ucfg.get('namespace'),"redis_manage_user_tpl.txt",data) pod_name = util.get_pod_name("{.items[?(@.metadata.labels.name == 'redis')].metadata.name}") if(pod_name): res = exec_redis_command(pod_name,path_to_script) if res: user_metadata.add_metadata("REDIS_PREFIX",prefix) _add_redis_user_metadata(ucfg, user_metadata) return res else: logging.error(f"failed to add redis namespace {ucfg.get('namespace')}") return None except Exception as e: logging.error(f"failed to add redis namespace {ucfg.get('namespace')}: {e}") return None def delete_db_user(namespace): logging.info(f"removing redis namespace {namespace}") try: data = util.get_redis_config_data() data["namespace"]=namespace data["mode"]="delete" path_to_script = render_redis_script(namespace,"redis_manage_user_tpl.txt",data) pod_name = util.get_pod_name("{.items[?(@.metadata.labels.name == 'redis')].metadata.name}") if(pod_name): res = exec_redis_command(pod_name,path_to_script) return res return None except Exception as e: logging.error(f"failed to remove redis namespace {namespace}: {e}") return None def patch(status, action, owner=None): """ Called the the operator patcher to create/delete redis """ try: logging.info(f"*** handling request to {action} redis") if action == 'create': msg = create(owner) operator_util.patch_operator_status(status,'redis','on') else: msg = delete(owner) operator_util.patch_operator_status(status,'redis','off') logging.info(msg) logging.info(f"*** hanlded request to {action} redis") except Exception as e: logging.error('*** failed to update redis: %s' % e) operator_util.patch_operator_status(status,'redis','error')