nuvolaris/minio_deploy.py (209 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import kopf, logging, time, os import nuvolaris.kube as kube import nuvolaris.kustomize as kus import nuvolaris.config as cfg import nuvolaris.util as util import nuvolaris.minio_util as mutil import nuvolaris.openwhisk as openwhisk import nuvolaris.minio_ingress as minio_ingress import nuvolaris.operator_util as operator_util from nuvolaris.user_config import UserConfig from nuvolaris.user_metadata import UserMetadata from nuvolaris.minio_util import MinioClient def _add_miniouser_metadata(ucfg: UserConfig, user_metadata:UserMetadata): """ adds entries for minio connectivity S3_HOST, S3_PORT, S3_ACCESS_KEY, S3_SECRET_KEY this is becasue MINIO """ try: minio_service = util.get_service("{.items[?(@.spec.selector.app == 'minio')]}") if(minio_service): minio_host = f"{minio_service['metadata']['name']}.{minio_service['metadata']['namespace']}.svc.cluster.local" access_key = ucfg.get('namespace') secret_key = ucfg.get("object-storage.password") user_metadata.add_metadata("S3_PROVIDER","minio") user_metadata.add_metadata("S3_HOST",minio_host) user_metadata.add_metadata("S3_ACCESS_KEY",access_key) user_metadata.add_metadata("S3_SECRET_KEY",secret_key) user_metadata.add_safely_from_cm("S3_API_URL", '{.metadata.annotations.s3_api_url}') user_metadata.add_safely_from_cm("S3_CONSOLE_URL", '{.metadata.annotations.s3_console_url}') ports = list(minio_service['spec']['ports']) for port in ports: if(port['name']=='minio-api'): user_metadata.add_metadata("S3_PORT",port['port']) return None except Exception as e: logging.error(f"failed to build MINIO metadata for {ucfg.get('namespace')}: {e}") return None def find_content_path(filename): absolute_path = os.path.dirname(__file__) relative_path = "../deploy/content" return os.path.join(absolute_path, relative_path, filename) def create(owner=None): logging.info(f"*** configuring minio standalone") data = util.get_minio_config_data() tplp = ["00-minio-pvc.yaml","01-minio-dep.yaml","02-minio-svc.yaml"] if(data['affinity'] or data['tolerations']): tplp.append("affinity-tolerance-dep-core-attach.yaml") kust = kus.patchTemplates("minio", tplp, data) spec = kus.kustom_list("minio", kust, templates=[], data=data) if owner: kopf.append_owner_reference(spec['items'], owner) else: cfg.put("state.minio.spec", spec) res = kube.apply(spec) # dynamically detect minio pod and wait for readiness util.wait_for_pod_ready("{.items[?(@.metadata.labels.app == 'minio')].metadata.name}") create_nuv_storage(data) minio_ingress.create_minio_ingresses(data, owner) logging.info("*** configured minio standalone") return res def _annotate_nuv_metadata(data): """ annotate nuvolaris configmap with entries for minio connectivity S3_ENDPOINT, S3_PORT, S3_ACCESS_KEY, S3_SECRET_KEY this is becasue MINIO """ try: minio_service = util.get_service("{.items[?(@.spec.selector.app == 'minio')]}") if(minio_service): minio_host = f"{minio_service['metadata']['name']}.{minio_service['metadata']['namespace']}.svc.cluster.local" access_key = data["minio_nuv_user"] secret_key = data["minio_nuv_password"] openwhisk.annotate(f"s3_host={minio_host}") openwhisk.annotate(f"s3_access_key={access_key}") openwhisk.annotate(f"s3_secret_key={secret_key}") openwhisk.annotate("s3_provider=minio") ports = list(minio_service['spec']['ports']) for port in ports: if(port['name']=='minio-api'): openwhisk.annotate(f"s3_port={port['port']}") return None except Exception as e: logging.error(f"failed to build minio_host for nuvolaris: {e}") return None def create_nuv_storage(data): """ Creates nuvolaris MINIO custom resources """ logging.info(f"*** configuring MINIO storage for nuvolaris") # introduce a 10 seconds delay to be sure that MINIO server is up and running completely as pod readines not to be enough time.sleep(10) minioClient = mutil.MinioClient() res = minioClient.add_user(data["minio_nuv_user"], data["minio_nuv_password"]) if(res): _annotate_nuv_metadata(data) bucket_policy_names = [] logging.info("*** adding nuvolaris MINIO data bucket") res = minioClient.make_bucket("nuvolaris-data") bucket_policy_names.append("nuvolaris-data/*") if(res): openwhisk.annotate("s3_bucket_data=nuvolaris-data") logging.info("*** adding nuvolaris MINIO static public bucket") res = minioClient.make_public_bucket("nuvolaris-web") bucket_policy_names.append("nuvolaris-web/*") if(res): openwhisk.annotate("s3_bucket_static=nuvolaris-web") content_path = find_content_path("index.html") if(content_path): logging.info(f"uploading example content to nuvolaris-web from {content_path}") res = minioClient.upload_folder_content(content_path,"nuvolaris-web") else: logging.warn("could not find example static content to upload") if(len(bucket_policy_names)>0): logging.info(f"granting rw access to created policies under username {data['minio_nuv_user']}") minioClient.assign_rw_bucket_policy_to_user(data["minio_nuv_user"],bucket_policy_names) logging.info("*** configured MINIO storage for nuvolaris") def assign_bucket_quota(bucket_name, quota, minioClient:MinioClient): if not quota.lower() in ['auto'] and quota.isnumeric(): logging.info(f"*** setting quota on bucket {bucket_name} with hardlimit to {quota}m") res = minioClient.assign_quota_to_bucket(bucket_name,quota) if res: logging.info(f"*** quota on bucket {bucket_name} set successfully") else: logging.warn(f"*** skipping quota set on bucket {bucket_name}. Requested quota values is {quota}") def create_ow_storage(state, ucfg: UserConfig, user_metadata: UserMetadata, owner=None): minioClient = mutil.MinioClient() namespace = ucfg.get("namespace") secretkey = ucfg.get("object-storage.password") logging.info(f"*** configuring storage for namespace {namespace}") res = minioClient.add_user(namespace, secretkey) state['storage_user']=res if(res): _add_miniouser_metadata(ucfg, user_metadata) bucket_policy_names = [] if(ucfg.get('object-storage.data.enabled')): bucket_name = ucfg.get('object-storage.data.bucket') logging.info(f"*** adding private bucket {bucket_name} for {namespace}") res = minioClient.make_bucket(bucket_name) bucket_policy_names.append(f"{bucket_name}/*") state['storage_data']=res if(res): user_metadata.add_metadata("S3_BUCKET_DATA",bucket_name) ucfg.put("S3_BUCKET_DATA",bucket_name) if ucfg.exists('object-storage.quota'): assign_bucket_quota(bucket_name,ucfg.get('object-storage.quota'), minioClient) if(ucfg.get('object-storage.route.enabled')): bucket_name = ucfg.get("object-storage.route.bucket") logging.info(f"*** adding public bucket {bucket_name} for {namespace}") res = minioClient.make_public_bucket(bucket_name) bucket_policy_names.append(f"{bucket_name}/*") if(res): user_metadata.add_metadata("S3_BUCKET_STATIC",bucket_name) ucfg.put("S3_BUCKET_STATIC",bucket_name) if ucfg.exists('object-storage.quota'): assign_bucket_quota(bucket_name,ucfg.get('object-storage.quota'), minioClient) content_path = find_content_path("index.html") if(content_path): logging.info(f"uploading example content to {bucket_name} from {content_path}") res = minioClient.upload_folder_content(content_path,bucket_name) else: logging.warn("could not find example static content to upload") state['storage_route']=res if(len(bucket_policy_names)>0): logging.info(f"granting rw access to created policies under namespace {namespace}") minioClient.assign_rw_bucket_policy_to_user(namespace,bucket_policy_names) return state def delete_ow_storage(ucfg): minioClient = mutil.MinioClient() namespace = ucfg.get("namespace") if(ucfg.get('object-storage.data.enabled')): bucket_name = ucfg.get('object-storage.data.bucket') logging.info(f"*** removing private bucket {bucket_name} for {namespace}") res = minioClient.force_bucket_remove(bucket_name) if(ucfg.get('object-storage.route.enabled')): bucket_name = ucfg.get("object-storage.route.bucket") logging.info(f"*** removing public bucket {bucket_name} for {namespace}") res = minioClient.force_bucket_remove(bucket_name) return minioClient.delete_user(namespace) def delete_by_owner(): spec = kus.build("minio") res = kube.delete(spec) logging.info(f"delete minio: {res}") return res def delete_by_spec(): spec = cfg.get("state.minio.spec") res = False if spec: res = kube.delete(spec) logging.info(f"delete minio: {res}") return res def delete(owner=None): data = util.get_minio_config_data() minio_ingress.delete_minio_ingresses(data, owner) if owner: return delete_by_owner() else: return delete_by_spec() def patch(status, action, owner=None): """ Called by the operator patcher to create/delete minio component """ try: logging.info(f"*** handling request to {action} minio") if action == 'create': msg = create(owner) operator_util.patch_operator_status(status,'minio','on') else: msg = delete(owner) operator_util.patch_operator_status(status,'minio','off') logging.info(msg) logging.info(f"*** hanlded request to {action} minio") except Exception as e: logging.error('*** failed to update minio: %s' % e) operator_util.patch_operator_status(status,'minio','error') def patch_ingresses(status, action, owner=None): """ Called by the operator patcher to create/delete minio component """ try: logging.info(f"*** handling request to {action} minio ingresses") data = util.get_minio_config_data() if action == 'update': msg = minio_ingress.create_minio_ingresses(data, owner) operator_util.patch_operator_status(status,'minio-ingresses','on') logging.info(msg) logging.info(f"*** hanlded request to {action} minio ingresses") except Exception as e: logging.error('*** failed to update minio ingresses: %s' % e) operator_util.patch_operator_status(status,'minio-ingresses','error')