nuvolaris/util.py (535 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# this module wraps utilities functions
import logging
import math
import random
import time
import uuid
from base64 import b64decode, b64encode
from typing import List, Union
from urllib.parse import urlparse
import urllib3
from urllib3.exceptions import NewConnectionError, MaxRetryError, ProtocolError
import nuvolaris.apihost_util as apihost_util
import nuvolaris.config as cfg
import nuvolaris.kube as kube
# Implements truncated exponential backoff from
# https://cloud.google.com/storage/docs/retry-strategy#exponential-backoff
def nuv_retry(deadline_seconds=120, max_backoff=5):
def decorator(function):
from functools import wraps
@wraps(function)
def wrapper(*args, **kwargs):
deadline = time.time() + deadline_seconds
retry_number = 0
while True:
try:
result = function(*args, **kwargs)
return result
except Exception as e:
current_t = time.time()
backoff_delay = min(
math.pow(2, retry_number) + random.random(), max_backoff
)
if current_t + backoff_delay < deadline:
time.sleep(backoff_delay)
retry_number += 1
logging.warn(f"#{retry_number} nuv_retry detected a failure...")
continue # retry again
else:
raise
return wrapper
return decorator
def get_default_storage_class():
"""
Get the storage class attempting to get the default storage class defined on the configured kubernetes environment
"""
storage_class = kube.kubectl("get", "storageclass", jsonpath=r"{.items[?(@.metadata.annotations.storageclass\.kubernetes\.io\/is-default-class=='true')].metadata.name}")
storage_class += kube.kubectl("get", "storageclass", jsonpath=r"{.items[?(@.metadata.annotations.storageclass\.beta\.kubernetes\.io\/is-default-class=='true')].metadata.name}")
if(storage_class):
return storage_class[0]
return ""
def get_default_storage_provisioner():
"""
Get the storage provisioner
"""
provisioner = kube.kubectl("get", "storageclass", jsonpath=r"{.items[?(@.metadata.annotations.storageclass\.kubernetes\.io\/is-default-class=='true')].provisioner}")
provisioner += kube.kubectl("get", "storageclass", jsonpath=r"{.items[?(@.metadata.annotations.storageclass\.beta\.kubernetes\.io\/is-default-class=='true')].metadata.name}")
if(provisioner):
return provisioner[0]
return ""
def get_ingress_namespace(runtime):
"""
Attempt to determine the namespace where the ingress-nginx-controller service has been deployed
checking the nuvolaris.ingresslb
- When set to 'auto' it will attempt to calculate it according to the kubernetes runtime
- When set to <> 'auto' it will return the configured value. The configured value should be in the form <namespace>/<ingress-nginx-controller-service-name>
>>> import nuvolaris.config as cfg
>>> cfg.put('nuvolaris.ingresslb','auto')
True
>>> get_ingress_namespace('microk8s')
'ingress'
>>> get_ingress_namespace('kind')
'ingress-nginx'
>>> cfg.put('nuvolaris.ingresslb','ingress-nginx-azure/ingress-nginx-controller')
True
>>> get_ingress_namespace('kind')
'ingress-nginx-azure'
"""
ingresslb_value = cfg.get('nuvolaris.ingresslb') or 'auto'
if 'auto' != ingresslb_value:
ingress_namespace = ingresslb_value.split('/')[0]
logging.debug(f"skipping ingress namespace auto detection and returning {ingress_namespace}")
return ingress_namespace
if runtime == "microk8s":
return "ingress"
else:
return "ingress-nginx"
def get_ingress_service_name(runtime):
"""
Attempt to determine the namespace where the ingress-nginx-controller service has been deployed
checking the nuvolaris.ingresslb
- When set to 'auto' it will attempt to calculate it according to the kubernetes runtime
- When set to <> 'auto' it will return the configured value. The configured value should be in the form <namespace>/<ingress-nginx-controller-service-name>
>>> import nuvolaris.config as cfg
>>> cfg.put('nuvolaris.ingresslb','auto')
True
>>> get_ingress_service_name('microk8s')
'service/ingress-nginx-controller'
>>> get_ingress_service_name('kind')
'service/ingress-nginx-controller'
>>> cfg.put('nuvolaris.ingresslb','ingress-nginx-azure/ingress-nginx-controller-custom')
True
>>> get_ingress_service_name('kind')
'service/ingress-nginx-controller-custom'
"""
ingresslb_value = cfg.get('nuvolaris.ingresslb') or 'auto'
if 'auto' != ingresslb_value:
ingress_srv_name = f"service/{ingresslb_value.split('/')[1]}"
logging.debug(f"skipping ingress service name auto detection and returning {ingress_srv_name}")
return ingress_srv_name
return "service/ingress-nginx-controller"
def get_ingress_class(runtime):
"""
Attempt to determine the proper ingress class
- When set to 'auto' it will attempt to calculate it according to the kubernetes runtime
- When set to <> 'auto' it will return the configured value.
"""
ingress_class = cfg.get('nuvolaris.ingressclass') or 'auto'
if 'auto' != ingress_class:
logging.warn(f"skipping ingress class auto detection and returning {ingress_class}")
return ingress_class
# ingress class default to nginx
ingress_class = "nginx"
# On microk8s ingress class must be public
if runtime == "microk8s":
ingress_class = "public"
# On k3s ingress class must be traefik
if runtime == "k3s":
ingress_class = "traefik"
return ingress_class
# determine the ingress-nginx flavour
def get_ingress_yaml(runtime):
if runtime == "eks":
return "eks-nginx-ingress.yaml"
elif runtime == "kind":
return "kind-nginx-ingress.yaml"
else:
return "cloud-nginx-ingress.yaml"
# wait for a pod name
@nuv_retry()
def get_pod_name(jsonpath,namespace="nuvolaris"):
pod_name = kube.kubectl("get", "pods", namespace=namespace, jsonpath=jsonpath)
if(pod_name):
return pod_name[0]
raise Exception(f"could not find any pod matching jsonpath={jsonpath}")
# helper method waiting for a pod ready using the given jsonpath to retrieve the pod name
def wait_for_pod_ready(pod_name_jsonpath, timeout="600s", namespace="nuvolaris"):
try:
pod_name = get_pod_name(pod_name_jsonpath, namespace)
logging.info(f"checking pod {pod_name}")
while not kube.wait(f"pod/{pod_name}", "condition=ready", timeout, namespace):
logging.info(f"waiting for {pod_name} to be ready...")
time.sleep(1)
except Exception as e:
logging.error(e)
def status_matches(code: int, allowed: List[Union[int, str]]) -> bool:
"""Check if the status code matches any allowed pattern."""
for pattern in allowed:
if isinstance(pattern, int) and code == pattern:
return True
if isinstance(pattern, str) and len(pattern) == 3 and pattern.endswith("XX"):
if int(pattern[0]) == code // 100:
return True
return False
def status_matches(code: int, allowed: List[Union[int, str]]) -> bool:
"""Check if the status code matches any allowed pattern."""
for pattern in allowed:
if isinstance(pattern, int) and code == pattern:
return True
if isinstance(pattern, str) and len(pattern) == 3 and pattern.endswith("XX"):
if int(pattern[0]) == code // 100:
return True
return False
def wait_for_http(url: str, timeout: int = 60, up_statuses: List[Union[int, str]] = [200]):
"""Wait until an HTTP endpoint becomes available with an accepted status code.
Args:
url (str): Full URL to check (e.g. http://milvus:9091/healthz)
timeout (int): Total seconds to wait before giving up.
up_statuses (List[Union[int, str]]): Status codes or patterns considered as 'UP'.
Raises:
TimeoutError: If the endpoint doesn't respond with a valid status within the timeout.
"""
parsed = urlparse(url)
scheme = parsed.scheme
host = parsed.hostname
port = parsed.port or (443 if scheme == "https" else 80)
path = parsed.path or "/"
if scheme == "https":
conn = urllib3.connectionpool.HTTPSConnectionPool(host, port=port,
timeout=urllib3.util.Timeout(connect=5.0, read=5.0),
retries=False)
else:
conn = urllib3.connectionpool.HTTPConnectionPool(host, port=port,
timeout=urllib3.util.Timeout(connect=5.0, read=5.0),
retries=False)
deadline = time.time() + timeout
while time.time() < deadline:
try:
response = conn.request("GET", path)
if status_matches(response.status, up_statuses):
logging.info(f"Service is up: {url} (status {response.status})")
return
else:
logging.warning(f"Service responded with {response.status}, not in {up_statuses}. Waiting...")
except (NewConnectionError, MaxRetryError):
logging.warning(f"Cannot connect to {url}, retrying...")
except ProtocolError as e:
if "Connection reset by peer" in str(e):
logging.warning("Connection reset by peer. Sleeping 2 seconds...")
time.sleep(2)
continue
else:
logging.error(f"Protocol error: {e}")
time.sleep(1)
# return mongodb configuration parameter with default valued if not configured
def get_mongodb_config_data():
data = {
'mongo_admin_user': cfg.get('mongodb.admin.user') or "whisk_user",
'mongo_admin_password': cfg.get('mongodb.admin.password') or "0therPa55",
'mongo_nuvolaris_user': cfg.get('mongodb.nuvolaris.user') or "nuvolaris",
'mongo_nuvolaris_password': cfg.get('mongodb.nuvolaris.password') or "s0meP@ass3",
'size': cfg.get('mongodb.volume-size') or 10,
'pvcName': 'mongodb-data',
'storageClass':cfg.get("nuvolaris.storageclass"),
'pvcAccessMode':'ReadWriteOnce'
}
return data
def parse_image(img):
"""
Parse a string representing a pod image in the form <image>:<tag> and return
a dictionary containing {"image":<img>, "tag":<tag>}
>>> img_data = parse_image("ghcr.io/nuvolaris/openwhisk-controller:0.3.0-morpheus.22122609")
>>> "ghcr.io/nuvolaris/openwhisk-controller" == img_data["image"]
True
>>> "0.3.0-morpheus.22122609" == img_data["tag"]
True
"""
tmp_img_items = img.split(":")
if len(tmp_img_items) != 2:
raise Exception(f"wrong image name format {img}. Image and tag must be separated by a :")
data = {
"image": tmp_img_items[0],
"tag": tmp_img_items[1],
}
return data
def get_controller_image_data(data):
controller_image = cfg.get("controller.image")
if ":" in controller_image:
img_data = parse_image(controller_image)
data['controller_image'] = img_data["image"]
data['controller_tag'] = img_data["tag"]
else:
data['controller_image'] = cfg.get("controller.image") or "ghcr.io/nuvolaris/openwhisk-controller"
data['controller_tag'] = cfg.get("controller.tag") or "3.1.0-mastrogpt.2402101445"
# return configuration parameters for the standalone controller
def get_standalone_config_data():
data = {
"name":"controller",
"couchdb_host": cfg.get("couchdb.host") or "couchdb",
"couchdb_port": cfg.get("couchdb.port") or "5984",
"couchdb_admin_user": cfg.get("couchdb.admin.user"),
"couchdb_admin_password": cfg.get("couchdb.admin.password"),
"couchdb_controller_user": cfg.get("couchdb.controller.user"),
"couchdb_controller_password": cfg.get("couchdb.controller.password"),
"triggers_fires_perMinute": cfg.get("configs.limits.triggers.fires-perMinute") or 60,
"actions_sequence_maxLength": cfg.get("configs.limits.actions.sequence-maxLength") or 50,
"actions_invokes_perMinute": cfg.get("configs.limits.actions.invokes-perMinute") or 60,
"actions_invokes_concurrent": cfg.get("configs.limits.actions.invokes-concurrent") or 30,
"activation_payload_max": cfg.get('configs.limits.activations.max_allowed_payload') or "1048576",
"time_limit_min": cfg.get("configs.limits.time.limit-min") or "100ms",
"time_limit_std": cfg.get("configs.limits.time.limit-std") or "1min",
"time_limit_max": cfg.get("configs.limits.time.limit-max") or "5min",
"memory_limit_min": cfg.get("configs.limits.memory.limit-min") or "128m",
"memory_limit_std": cfg.get("configs.limits.memory.limit-std") or "256m",
"memory_limit_max": cfg.get("configs.limits.memory.limit-max") or "512m",
"concurrency_limit_min": cfg.get("configs.limits.concurrency.limit-min") or 1,
"concurrency_limit_std": cfg.get("configs.limits.concurrency.limit-std") or 1,
"concurrency_limit_max": cfg.get("configs.limits.concurrency.limit-max") or 1,
"controller_java_opts": cfg.get('configs.controller.javaOpts') or "-Xmx2048M",
"invoker_containerpool_usermemory": cfg.get('configs.invoker.containerPool.userMemory') or "2048m",
"container_cpu_req": cfg.get('configs.controller.resources.cpu-req') or "500m",
"container_cpu_lim": cfg.get('configs.controller.resources.cpu-lim') or "1",
"container_mem_req": cfg.get('configs.controller.resources.mem-req') or "1G",
"container_mem_lim": cfg.get('configs.controller.resources.mem-lim') or "2G",
"container_manage_resources": cfg.exists('configs.controller.resources.cpu-req')
}
get_controller_image_data(data)
standalone_affinity_tolerations_data(data)
return data
def validate_ow_auth(auth):
"""
>>> import nuvolaris.testutil as tutil
>>> import nuvolaris.util as util
>>> auth = tutil.generate_ow_auth()
>>> util.validate_ow_auth(auth)
True
>>> util.validate_ow_auth('21321:3213216')
False
"""
try:
parts = auth.split(':')
try:
uid = str(uuid.UUID(parts[0], version = 4))
except ValueError:
logging.error('authorization id is not a valid UUID')
return False
key = parts[1]
if len(key) < 64:
logging.error('authorization key must be at least 64 characters long')
return False
return True
except Exception as e:
logging.error('failed to determine authorization id and key: %s' % e)
return False
def check(f, what, res):
if f:
logging.info(f"OK: {what}")
return res and True
else:
logging.warn(f"ERR: {what}")
return False
# return redis configuration parameters with default values if not configured
def get_redis_config_data():
# ensure prefix key contains : at the end to be compliant with REDIS script ACL creator
prefix = cfg.get("redis.nuvolaris.prefix") or "nuvolaris:"
if(not prefix.endswith(":")):
prefix = f"{prefix}:"
data = {
"applypodsecurity":get_enable_pod_security(),
"name": "redis",
"container": "redis",
"dir": "/bitnami/redis/data",
"size": cfg.get("redis.volume-size", "REDIS_VOLUME_SIZE", 10),
"storageClass": cfg.get("nuvolaris.storageclass"),
"redis_password":cfg.get("redis.default.password") or "s0meP@ass3",
"namespace":"nuvolaris",
"password":cfg.get("redis.nuvolaris.password") or "s0meP@ass3",
"prefix": prefix,
"persistence": cfg.get("redis.persistence-enabled") or False,
"maxmemory": cfg.get("redis.maxmemory") or "1000mb"
}
redis_affinity_tolerations_data(data)
return data
def get_service(jsonpath,namespace="nuvolaris"):
services= kube.kubectl("get", "svc", namespace=namespace, jsonpath=jsonpath)
if(services):
return services[0]
raise Exception(f"could not find any svc matching jsonpath={jsonpath}")
# return minio configuration parameters with default values if not configured
def get_minio_config_data():
data = {
"applypodsecurity":get_enable_pod_security(),
"name":"minio-deployment",
"container":"minio",
"minio_host": cfg.get('minio.host') or 'nuvolaris-minio',
"minio_volume_size": cfg.get('minio.volume-size') or "5",
"minio_root_user": cfg.get('minio.admin.user') or "minio",
"minio_root_password": cfg.get('minio.admin.password') or "minio123",
"storage_class": cfg.get("nuvolaris.storageclass"),
"minio_nuv_user": cfg.get('minio.nuvolaris.user') or "nuvolaris",
"minio_nuv_password": cfg.get('minio.nuvolaris.password') or "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG",
"minio_s3_ingress_enabled": cfg.get('minio.ingress.s3-enabled') or False,
"minio_console_ingress_enabled": cfg.get('minio.ingress.console-enabled') or False,
"minio_s3_ingress_hostname": cfg.get('minio.ingress.s3-hostname') or "auto",
"minio_console_ingress_hostname": cfg.get('minio.ingress.console-hostname') or "auto"
}
minio_affinity_tolerations_data(data)
return data
# return postgres configuration parameter with default valued if not configured
def get_postgres_config_data():
data = {
'postgres_root_password': cfg.get('postgres.admin.password') or "0therPa55",
'postgres_root_replica_password': cfg.get('postgres.admin.password') or "0therPa55sd",
'postgres_nuvolaris_user': "nuvolaris",
'postgres_nuvolaris_password': cfg.get('postgres.nuvolaris.password') or "s0meP@ass3",
'size': cfg.get('postgres.volume-size') or 10,
'replicas': cfg.get('postgres.admin.replicas') or 2,
'storageClass': cfg.get('nuvolaris.storageclass'),
'failover': cfg.get('postgres.failover') or False,
'backup': cfg.get('postgres.backup.enabled') or False,
'schedule': cfg.get('postgres.backup.schedule') or '30 * * * *'
}
postgres_affinity_tolerations_data(data)
return data
def get_postgres_backup_data():
data = {
'size': cfg.get('postgres.volume-size') or 10,
'storageClass': cfg.get('nuvolaris.storageclass'),
'schedule': cfg.get('postgres.backup.schedule') or '30 * * * *',
'name': 'nuvolaris-postgres-backup',
'dir':'/var/lib/backup',
'container':'nuvolaris-postgres-backup'
}
postgres_affinity_tolerations_data(data)
return data
# wait for a service matching the given jsonpath name
@nuv_retry()
def wait_for_service(jsonpath,namespace="nuvolaris"):
service_names = kube.kubectl("get", "svc", namespace=namespace, jsonpath=jsonpath)
if(service_names):
return service_names[0]
raise Exception(f"could not find any pod matching jsonpath={jsonpath}")
def get_controller_http_timeout():
return cfg.get("configs.limits.time.limit-max") or "5min"
def get_apihost_from_config_map(namespace="nuvolaris"):
annotations= kube.kubectl("get", "cm/config", namespace=namespace, jsonpath='{.metadata.annotations.apihost}')
if(annotations):
return annotations[0]
raise Exception("Could not find apihost annotation inside internal cm/config config Map")
def get_value_from_config_map(namespace="nuvolaris", path='{.metadata.annotations.apihost}'):
annotations= kube.kubectl("get", "cm/config", namespace=namespace, jsonpath=path)
if(annotations):
return annotations[0]
raise Exception(f"Could not find {path} annotation inside internal cm/config config Map")
def get_enable_pod_security():
"""
Return true if there is the need to enable pod security context
for some specific pod. This is a test based on some empiric assumption on runtime
basis and/or storage class.
@TODO: find a better way to determine when this function should return true.
"""
runtime = cfg.get('nuvolaris.kube')
storage_class = cfg.get('nuvolaris.storageclass')
return runtime in ["eks","gke","aks","generic"] or (runtime in ["k3s"] and "rook" in storage_class)
def get_runtimes_json_from_config_map(namespace="nuvolaris", path=r'{.data.runtimes\.json}'):
""" Return the configured runtimes.json from the config map cm/openwhisk-runtimes
"""
runtimes= kube.kubectl("get", "cm/openwhisk-runtimes", namespace=namespace, jsonpath=path)
if(runtimes):
return runtimes[0]
raise Exception("Could not find runtimes.json inside cm/openwhisk-runtimes config Map")
# return static nginx configuration parameters with default values if not configured
def get_storage_static_config_data():
data = {
"name":"nuvolaris-static",
"container":"nuvolaris-static",
"size":1,
"storageClass": cfg.get('nuvolaris.storageclass'),
"dir":"/var/cache/nginx",
"applypodsecurity": get_enable_pod_security()
}
if cfg.get('components.minio'):
minio_host=cfg.get('minio.host') or "nuvolaris-minio"
minio_port=cfg.get('minio.port') or "9000"
data['storage_url']=f"http://{minio_host}.nuvolaris.svc.cluster.local:{minio_port}"
if cfg.get('components.cosi'):
data['storage_url']=apihost_util.add_suffix_to_url(get_object_storage_rgw_url(),"cluster.local")
storage_static_affinity_tolerations_data(data)
return data
# populate common affinity data
def common_affinity_tolerations_data(data):
data["affinity"] = cfg.get('nuvolaris.affinity') or False
data["tolerations"] = cfg.get('nuvolaris.tolerations') or False
data["affinity_invoker_node_label"] = "invoker"
data["affinity_core_node_label"] = "core"
data["toleration_role"] = "core"
# populate specific affinity data for couchdb
def couch_affinity_tolerations_data(data):
common_affinity_tolerations_data(data)
data["pod_anti_affinity_name"] = "couchdb"
# populate specific affinity data for redis
def redis_affinity_tolerations_data(data):
common_affinity_tolerations_data(data)
data["pod_anti_affinity_name"] = "redis"
# populate specific affinity data for minio
def minio_affinity_tolerations_data(data):
common_affinity_tolerations_data(data)
data["pod_anti_affinity_name"] = "minio"
# populate specific affinity data for minio
def storage_static_affinity_tolerations_data(data):
common_affinity_tolerations_data(data)
data["pod_anti_affinity_name"] = "nuvolaris-static"
# populate specific affinity data for postgres
def postgres_affinity_tolerations_data(data):
common_affinity_tolerations_data(data)
data["pod_anti_affinity_name"] = "nuvolaris-postgres"
# populate specific affinity data for ferretdb
def ferretb_affinity_tolerations_data(data):
common_affinity_tolerations_data(data)
data["pod_anti_affinity_name"] = "ferretdb"
# populate specific affinity data for ferretdb
def standalone_affinity_tolerations_data(data):
common_affinity_tolerations_data(data)
data["pod_anti_affinity_name"] = "controller"
# populate specific affinity data for postgres controller manager
def postgres_manager_affinity_tolerations_data():
data = {
"pod_anti_affinity_name":"kubegres-controller-manager",
"name":"kubegres-controller-manager"
}
common_affinity_tolerations_data(data)
return data
def postgres_backup_affinity_tolerations_data(data):
common_affinity_tolerations_data(data)
data["pod_anti_affinity_name"] = "nuvolaris-postgres-backup"
# wait for a pod name using a label selector and eventually an optional jsonpath
@nuv_retry()
def get_pod_name_by_selector(selector, jsonpath, namespace="nuvolaris"):
"""
get pods matching the given selector filtering them using the given jsonpath.
param: selector (eg app="nuvolaris-postgres")
param: jsonpath (eg "{.items[?(@.metadata.labels.replicationRole == 'primary')].metadata.name}")
return: 1st mathing pod name
"""
pod_names = kube.kubectl("get", "pods","-l", selector, namespace=namespace, jsonpath=jsonpath)
if(pod_names):
return pod_names[0]
raise Exception(f"could not find any pod matching jsonpath={jsonpath}")
# wait for a svc name using a label selector and eventually an optional jsonpath
@nuv_retry()
def get_service_by_selector(selector,jsonpath,namespace="nuvolaris"):
"""
get services matching the given selector filtering them using the given jsonpath
param: selector (eg app="nuvolaris-postgres")
param: jsonpath (eg "{.items[?(@.metadata.labels.replicationRole == 'primary')].metadata.name}")
return: 1st mathing service name
"""
services= kube.kubectl("get", "svc","-l",selector, namespace=namespace, jsonpath=jsonpath)
if(services):
return services[0]
raise Exception(f"could not find any svc matching jsonpath={jsonpath}")
def get_kvrocks_config_data():
# ensure prefix key contains : at the end to be compliant with REDIS script ACL creator
prefix = cfg.get("redis.nuvolaris.prefix") or "nuvolaris:"
if(not prefix.endswith(":")):
prefix = f"{prefix}:"
data = {
"applypodsecurity":get_enable_pod_security(),
"name": "kvrocks",
"container": "redis",
"dir": "/var/lib/kvrocks/data",
"size": cfg.get("redis.volume-size", "REDIS_VOLUME_SIZE", 10),
"storageClass": cfg.get("nuvolaris.storageclass"),
"redis_password":cfg.get("redis.default.password") or "s0meP@ass3",
"namespace":"nuvolaris",
"password":cfg.get("redis.nuvolaris.password") or "s0meP@ass3",
"prefix": prefix,
"persistence": True,
"maxmemory": cfg.get("redis.maxmemory") or "1000mb",
"pvcName":"kvrocks-pvc",
"container_cpu_req": cfg.get('redis.resources.cpu-req') or "128",
"container_cpu_lim": cfg.get('redis.resources.cpu-lim') or "256",
"container_mem_req": cfg.get('redis.resources.mem-req') or "512m",
"container_mem_lim": cfg.get('redis.resources.mem-lim') or "1Gi",
}
redis_affinity_tolerations_data(data)
return data
def get_object_storage_class():
"""
Get the object storage class attempting to get the default storage class defined on the configured kubernetes environment
"""
storage_class = kube.kubectl("get", "storageclass", jsonpath="{.items[?(@.parameters.objectStoreName=='nuvolaris-s3-store')].metadata.name}")
if(storage_class):
return storage_class[0]
return ""
def get_object_storage_rgw_url():
"""
Get the object store RGW service URL, to be used to configure the static nginx services when running on top of a CEPH OBJECT STORE
"""
rgw_urls = kube.kubectl("get", "cephobjectstores",namespace="rook-ceph",jsonpath="{.items[?(@.metadata.name=='nuvolaris-s3-store')].status.info.endpoint}")
if(rgw_urls):
return rgw_urls[0]
return ""
def get_object_storage_rgw_srv_name():
"""
Get the object store RGW service URL, to be used to configure the static nginx services when running on top of a CEPH OBJECT STORE
"""
rgw_urls = kube.kubectl("get", "svc",namespace="rook-ceph",jsonpath="{.items[?(@.metadata.labels.rgw=='nuvolaris-s3-store')].metadata.name}")
if(rgw_urls):
return rgw_urls[0]
return ""
def get_object_storage_rgw_srv_http_port():
"""
Get the object store RGW service URL, to be used to configure the static nginx services when running on top of a CEPH OBJECT STORE
"""
rgw_ports = kube.kubectl("get", "svc",namespace="rook-ceph",jsonpath="{.items[?(@.metadata.labels.rgw=='nuvolaris-s3-store')].spec.ports[?(@.name=='http')].port}")
if(rgw_ports):
return rgw_ports[0]
return ""
def get_cosi_config_data():
data = {
"bucket_storageclass": cfg.get('cosi.bucket_storageclass') or "rook-ceph-bucket",
"s3_ingress_enabled": cfg.get('cosi.ingress.s3-enabled') or False,
"s3_ingress_hostname": cfg.get('cosi.ingress.s3-hostname') or "auto",
"rgwservice_name": cfg.get('cosi.rgwservice_name'),
"rgwservice_port": cfg.get('cosi.rgwservice_port'),
"cluster_namespace": cfg.get('cosi.namespace') or "rook-ceph",
"object_store_name": cfg.get('cosi.object_store_name') or "nuvolaris-s3-store",
"max_bucket_limit": cfg.get('cosi.max_bucket_limit') or 5,
}
return data
def b64_encode(value:str):
"""
Encode a value into as base 64
param: value to be encoded
return: the input value in case of error, otherwise the b64 representation of the the input value
"""
try:
return b64encode(value.encode(encoding="utf-8")).decode()
except:
return value
def b64_decode(encoded_str:str):
"""
Base 64 decode
param: encoded_str a b64 encoded string
return: the inpiut value in case of error, the decoded string otherwise.
"""
try:
return b64decode(encoded_str).decode()
except:
return encoded_str
# populate specific affinity data for redis
def etcd_affinity_tolerations_data(data):
common_affinity_tolerations_data(data)
data["pod_anti_affinity_name"] = "nuvolaris-etcd"
def get_etcd_initial_clusters(name: str, replicas = 1):
""" Calculate the proper setup for ETCD initial clusters
>>> print(get_etcd_initial_clusters("nuvolaris-etcd"))
nuvolaris-etcd-0=http://nuvolaris-etcd-0.nuvolaris-etcd-headless.nuvolaris.svc.cluster.local:2380
>>> print(get_etcd_initial_clusters("nuvolaris-etcd",2))
nuvolaris-etcd-0=http://nuvolaris-etcd-0.nuvolaris-etcd-headless.nuvolaris.svc.cluster.local:2380,nuvolaris-etcd-1=http://nuvolaris-etcd-1.nuvolaris-etcd-headless.nuvolaris.svc.cluster.local:2380
"""
etc_initial_clusters = ""
for idx in range(replicas):
if len(etc_initial_clusters) > 0:
etc_initial_clusters+=","
etc_initial_clusters += f"{name}-{idx}=http://{name}-{idx}.{name}-headless.nuvolaris.svc.cluster.local:2380"
return etc_initial_clusters.strip()
# populate etcd configuration parameters
def get_etcd_config_data():
data = {
"applypodsecurity":get_enable_pod_security(),
"name": "nuvolaris-etcd",
"container": "nuvolaris-etcd",
"size": cfg.get("etcd.volume-size", "REDIS_VOLUME_SIZE", 5),
"storageClass": cfg.get("nuvolaris.storageclass"),
"root_password":cfg.get("etcd.root.password") or "s0meP@ass3wd",
"etcd_replicas":get_etcd_replica(),
"etcd_auto_compaction_retention": cfg.get("etcd.auto_compaction_retention") or "1",
"etcd_quota_backend_bytes": cfg.get("etcd.quota-backend-bytes") or "2147483648",
"namespace":"nuvolaris",
"container_cpu_req": cfg.get('etcd.resources.cpu-req') or "250m",
"container_cpu_lim": cfg.get('etcd.resources.cpu-lim') or "375m",
"container_mem_req": cfg.get('etcd.resources.mem-req') or "256Mi",
"container_mem_lim": cfg.get('etcd.resources.mem-lim') or "384Mi"
}
data["etc_initial_cluster"] = get_etcd_initial_clusters(data["container"],data['etcd_replicas'])
etcd_affinity_tolerations_data(data)
return data
def get_etcd_replica():
return cfg.get("etcd.replicas") or 1
# populate specific affinity data for milvus controller manager
def milvus_manager_affinity_tolerations_data(data):
common_affinity_tolerations_data(data)
data["pod_anti_affinity_name"] = "milvus-operator"
return data
def milvus_standalone_affinity_tolerations_data(data):
common_affinity_tolerations_data(data)
data["pod_anti_affinity_name"] = "nuvolaris-milvus"
data["name"] = "nuvolaris-milvus-standalone"
data["container-name"] = "nuvolaris-milvus"
return data
# return milvus configuration parameter with default valued if not configured
def get_milvus_config_data():
data = {
'milvus_etcd_username': "etcdmilvus",
'milvus_etcd_password': cfg.get('milvus.password.etcd') or "0therPa55",
'milvus_etcd_root_password':cfg.get("etcd.root.password") or "s0meP@ass3wd",
'milvus_etcd_prefix': 'milvus',
'milvus_s3_username': 'miniomilvus',
'milvus_s3_password': cfg.get('milvus.password.s3') or "s0meP@ass3",
'milvus_bucket_name': 'vectors',
'milvus_bucket_prefix': 'milvus/nuvolaris-milvus',
'size': cfg.get('milvus.volume-size.cluster') or 10,
'zookeeper_size': cfg.get('milvus.volume-size.zookeeper') or 10,
'bookie_journal_size': cfg.get('milvus.volume-size.journal') or 25,
'bookie_ledgers_size': cfg.get('milvus.volume-size.ledgers') or 50,
'replicas': cfg.get('milvus.replicas') or 1,
'storageClass': cfg.get('nuvolaris.storageclass'),
'etcd_replicas':get_etcd_replica(),
'etcd_container': 'nuvolaris-etcd',
'milvus_root_password': cfg.get('milvus.password.root') or "An0therPa55",
'nuvolaris_password': cfg.get('milvus.nuvolaris.password') or "Nuv0therPa55",
'milvus_max_role_num': cfg.get('milvus.proxy.max-role-num') or 10,
'milvus_max_user_num': cfg.get('milvus.proxy.max-user-num') or 100,
'milvus_max_database_num': cfg.get('milvus.root-coord.max-database-num') or 64
}
data["etcd_range"]=range(data["etcd_replicas"])
milvus_standalone_affinity_tolerations_data(data)
return data