nuvolaris/milvus_standalone.py (70 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import time import kopf, logging import nuvolaris.kube as kube import nuvolaris.kustomize as kus import nuvolaris.config as cfg import nuvolaris.util as util import nuvolaris.operator_util as operator_util import nuvolaris.minio_util as mutil import nuvolaris.openwhisk as openwhisk from nuvolaris.milvus_admin_client import MilvusAdminClient from nuvolaris.user_config import UserConfig from nuvolaris.user_metadata import UserMetadata def patchEntries(data: dict): tplp = ["milvus-cfg-base.yaml", "milvus.yaml"] if (data['affinity'] or data['tolerations']): tplp.append("affinity-tolerance-dep-core-attach.yaml") kust = kus.patchTemplates("milvus", tplp, data) kust += kus.patchGenericEntry("Secret", "nuvolaris-milvus-etcd-secret", "/data/username", util.b64_encode(data['milvus_etcd_username'])) kust += kus.patchGenericEntry("Secret", "nuvolaris-milvus-etcd-secret", "/data/password", util.b64_encode(data['milvus_etcd_password'])) kust += kus.patchGenericEntry("Secret", "nuvolaris-milvus-s3-secret", "/stringData/accesskey", data['milvus_s3_username']) kust += kus.patchGenericEntry("Secret", "nuvolaris-milvus-s3-secret", "/stringData/secretkey", data['milvus_s3_password']) kust += kus.patchGenericEntry("PersistentVolumeClaim", "nuvolaris-milvus", "/spec/storageClassName", data['storageClass']) kust += kus.patchGenericEntry("PersistentVolumeClaim", "nuvolaris-milvus", "/spec/resources/requests/storage", f"{data['size']}Gi") kust += kus.patchGenericEntry("PersistentVolumeClaim", "nuvolaris-milvus-zookeeper", "/spec/storageClassName", data['storageClass']) kust += kus.patchGenericEntry("PersistentVolumeClaim", "nuvolaris-milvus-zookeeper", "/spec/resources/requests/storage", f"{data['zookeeper_size']}Gi") kust += kus.patchGenericEntry("PersistentVolumeClaim", "nuvolaris-milvus-bookie-journal", "/spec/storageClassName", data['storageClass']) kust += kus.patchGenericEntry("PersistentVolumeClaim", "nuvolaris-milvus-bookie-journal", "/spec/resources/requests/storage", f"{data['bookie_journal_size']}Gi") kust += kus.patchGenericEntry("PersistentVolumeClaim", "nuvolaris-milvus-bookie-ledgers", "/spec/storageClassName", data['storageClass']) kust += kus.patchGenericEntry("PersistentVolumeClaim", "nuvolaris-milvus-bookie-ledgers", "/spec/resources/requests/storage", f"{data['bookie_ledgers_size']}Gi") return kust def create(owner=None): """ Deploys the milvus vector db in standalone mode. """ data = util.get_milvus_config_data() res = create_milvus_accounts(data) if res: logging.info("*** creating a milvus standalone instance") kust = patchEntries(data) mspec = kus.kustom_list("milvus", kust, templates=[], data=data) if owner: kopf.append_owner_reference(mspec['items'], owner) else: cfg.put("state.milvus.spec", mspec) kube.apply(mspec) util.wait_for_pod_ready( r"{.items[?(@.metadata.labels.app\.kubernetes\.io\/instance == 'nuvolaris-milvus')].metadata.name}") milvus_api_host = cfg.get("milvus.host", "MILVUS_API_HOST", "nuvolaris-milvus") milvus_api_port = cfg.get("milvus.host", "MILVUS_API_PORT", "19530") logging.info("*** waiting for milvus api to be available") util.wait_for_http(f"http://{milvus_api_host}:{milvus_api_port}", up_statuses=[200,401], timeout=30) res = create_default_milvus_database(data) logging.info("*** created a milvus standalone instance") return res def create_milvus_accounts(data: dict): """" Creates technical accounts for ETCD and MINIO """ try: # currently we use the ETCD root password, so we skip the ETCD user creation. # res = util.check(etcd.create_etcd_user(data['milvus_etcd_username'],data['milvus_etcd_password'],data['milvus_etcd_prefix']),"create_etcd_milvus_user",True) minioClient = mutil.MinioClient() bucket_policy_names = [] bucket_policy_names.append(f"{data['milvus_bucket_name']}/*") res = util.check(minioClient.add_user(data["milvus_s3_username"], data["milvus_s3_password"]), "create_milvus_s3_user", True) res = util.check(minioClient.make_bucket(data["milvus_bucket_name"]), "create_milvus_s3_bucket", res) return util.check(minioClient.assign_rw_bucket_policy_to_user(data["milvus_s3_username"], bucket_policy_names), "assign_milvus_s3_bucket_policy", res) except Exception as ex: logging.error("Could not create milvus ETCD and MINIO accounts", ex) return False def create_default_milvus_database(data): """ Creates nuvolaris MILVUS custom resources """ logging.info("*** configuring MILVUS database for nuvolaris") adminClient = MilvusAdminClient() res = adminClient.setup_user("nuvolaris", data["nuvolaris_password"], "nuvolaris") if (res): _annotate_nuv_milvus_metadata(data) logging.info("*** configured MILVUS database for nuvolaris") return True return False def _annotate_nuv_milvus_metadata(data): """ annotate nuvolaris configmap with entries for MILVUS connectivity MILVUS_HOST, MILVUS_PORT, MILVUS_TOKEN, MILVUS_DB_NAME this is becasue MINIO """ try: milvus_service = util.get_service( r"{.items[?(@.metadata.labels.app\.kubernetes\.io\/instance == 'nuvolaris-milvus')]}") if (milvus_service): milvus_host = f"{milvus_service['metadata']['name']}.{milvus_service['metadata']['namespace']}.svc.cluster.local" password = data["nuvolaris_password"] openwhisk.annotate(f"milvus_host={milvus_host}") openwhisk.annotate(f"milvus_token=nuvolaris:{password}") openwhisk.annotate("milvus_db_name=nuvolaris") ports = list(milvus_service['spec']['ports']) for port in ports: if (port['name'] == 'milvus'): openwhisk.annotate(f"milvus_port={port['port']}") return None except Exception as e: logging.error(f"failed to annotate MILVUS for nuvolaris: {e}") return None def _add_milvus_user_metadata(ucfg: UserConfig, user_metadata: UserMetadata): """ adds entries for MILVUS connectivity MILVUS_HOST, MILVUS_PORT, MILVUS_TOKEN, MILVUS_DB_NAME """ try: milvus_service = util.get_service( r"{.items[?(@.metadata.labels.app\.kubernetes\.io\/instance == 'nuvolaris-milvus')]}") if (milvus_service): milvus_host = f"{milvus_service['metadata']['name']}.{milvus_service['metadata']['namespace']}.svc.cluster.local" milvus_token = f"{ucfg.get('namespace')}:{ucfg.get('milvus.password')}" user_metadata.add_metadata("MILVUS_HOST", milvus_host) user_metadata.add_metadata("MILVUS_TOKEN", milvus_token) user_metadata.add_metadata("MILVUS_DB_NAME", ucfg.get('milvus.database')) ports = list(milvus_service['spec']['ports']) for port in ports: if (port['name'] == 'milvus'): user_metadata.add_metadata("MILVUS_PORT", port['port']) return None except Exception as e: logging.error(f"failed to build MILVUS metadata for {ucfg.get('namespace')}: {e}") return None def create_ow_milvus(ucfg: UserConfig, user_metadata: UserMetadata, owner=None): logging.info(f"*** configuring MILVUS database for {ucfg.get('namespace')}") adminClient = MilvusAdminClient() username = ucfg.get("namespace") password = ucfg.get("milvus.password") database = ucfg.get("milvus.database") res = adminClient.setup_user(username, password, database) if (res): _add_milvus_user_metadata(ucfg, user_metadata) logging.info(f"*** configured MILVUS database linked to namespace {ucfg.get('namespace')}") return res def delete_ow_milvus(ucfg): logging.info(f"removing MILVUS database {ucfg.get('namespace')}") adminClient = MilvusAdminClient() res = adminClient.remove_user(ucfg.get('namespace'), ucfg.get('milvus.database')) if res: logging.info(f"removed MILVUS database linked to namespace {ucfg.get('namespace')}") return res def delete_by_owner(): spec = kus.build("milvus") res = kube.delete(spec) logging.info(f"delete milvus: {res}") return res def delete_by_spec(): spec = cfg.get("state.milvus.spec") if spec: res = kube.delete(spec) logging.info(f"delete milvus: {res}") return res def delete(owner=None): if owner: return delete_by_owner() else: return delete_by_spec() def patch(status, action, owner=None): """ Called by the operator patcher to create/delete milvus component """ try: logging.info(f"*** handling request to {action} milvus") if action == 'create': msg = create(owner) operator_util.patch_operator_status(status, 'milvus', 'on') else: msg = delete(owner) operator_util.patch_operator_status(status, 'milvus', 'off') logging.info(msg) logging.info(f"*** handled request to {action} milvus") except Exception as e: logging.error('*** failed to update milvus: %s' % e) operator_util.patch_operator_status(status, 'milvus', 'error')