nuvolaris/minio_util.py (75 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # # this module wraps mc minio client using admin credentials # to perform various operations import logging import json import subprocess import nuvolaris.config as cfg import nuvolaris.template as ntp import nuvolaris.util as util import os class MinioClient: def __init__(self): self.minio_api_host = cfg.get("minio.host", "MINIO_API_HOST", "nuvolaris-minio") self.minio_api_port = cfg.get("9000", "MINIO_API_PORT", "9000") self.admin_username = cfg.get("minio.admin.user", "MINIO_ADMIN_USER", "minioadmin") self.admin_password = cfg.get("minio.admin.password", "MINIO_ADMIN_PASSWORD", "minioadmin") self.minio_api_url = f"http://{self.minio_api_host}:{self.minio_api_port}" self.alias = "local" # automatically adds the local alias to mc configuration to operate on nuvolaris main MINIO server self.mc("alias","set", self.alias, self.minio_api_url, self.admin_username, self.admin_password) # execute minio commands using the mc cli tools installed by default inside the operator def mc(self, *kwargs): cmd = ["mc"] cmd += list(kwargs) # executing logging.debug(cmd) try: res = subprocess.run(cmd, capture_output=True) returncode = res.returncode output = res.stdout.decode() error = res.stderr.decode() if returncode != 0: logging.error(error) else: logging.info(output) return returncode == 0 except Exception as e: logging.error(e) return e def add_user(self, username, secret_key): """ adds a new minio user to the configured minio instance """ res = util.check(self.mc("admin","user","add", self.alias, username, secret_key),"add_user",True) return util.check(self.init_namespace_alias(username, secret_key),"init_namespace_alias",res) def remove_user(self, username): """ removes a minio user to the configured minio instance """ res = util.check(self.mc("admin","user","remove", self.alias, username),"remove_user",True) return util.check(self.remove_namespace_alias(username),"remove_namespace_alias",res) def make_bucket(self, bucket_name): """ adds a new bucket inside the configured minio instance """ return util.check(self.mc("mb",f"{self.alias}/{bucket_name}"),"make_bucket",True) def force_bucket_remove(self, bucket_name): """ removes unconditionally a bucket """ return util.check(self.mc("rb","--force",f"{self.alias}/{bucket_name}"),"force_bucket_remove",True) def make_public_bucket(self, bucket_name): """ adds a new public bucket to the configured minio instance """ res = util.check(self.make_bucket(bucket_name),"make_bucket",True) return util.check(self.mc("anonymous","-r","set","download",f"{self.alias}/{bucket_name}"),"make_public_bucket",res) def assign_quota_to_bucket(self, bucket_name, quota): """ assign the specified quota on the given bucket """ return util.check(self.mc("quota","set",f"{self.alias}/{bucket_name}","--size", f"{quota}m"),"assign_quota_to_bucket",True) def assign_policy_to_user(self, username, policy): """ assign the specified policy to the given username """ return util.check(self.mc("admin","policy","attach",self.alias,policy,"--user", username),"assign_policy_to_user",True) def add_policy(self, policy, path_to_policy_json): """ add a new policy into minio """ return util.check(self.mc("admin","policy","create",self.alias,policy,path_to_policy_json),"add_policy",True) def remove_policy(self, policy): """ add a new policy into minio """ return util.check(self.mc("admin","policy","remove",self.alias,policy),"remove_policy",True) def render_policy(self,username,template,data): """ uses the given template policy to render a final policy and returns the absolute path to rendered policy file. """ out = f"/tmp/__{username}_{template}" file = ntp.spool_template(template, out, data) return os.path.abspath(file) def assign_rw_bucket_policy_to_user(self,username,bucket_names): """ defines a rw policy template for the specified bucket and assigns it to the given username. """ policy_name = f"{username}_rw_policy" path_to_policy_json = self.render_policy(username,"minio_rw_policy_tpl.json",{"bucket_arns":bucket_names}) res=util.check(self.add_policy(policy_name,path_to_policy_json),"add_policy",True) res=util.check(self.assign_policy_to_user(username,policy_name),"assign_rw_bucket_policy_to_user",res) os.remove(path_to_policy_json) return res def delete_user(self,username): """ removes the user and the corresponding policy """ policy_name = f"{username}_rw_policy" res=util.check(self.remove_user(username),"removed_user",True) return util.check(self.remove_policy(policy_name),"deleted_user_policy",res) def init_namespace_alias(self, namespace, secret_key): """ called to initialize a local minio alias for a namespace user (used to impersonate the user uploading content) """ return util.check(self.mc("alias","set", f"local_{namespace}", self.minio_api_url, namespace, secret_key),"init_namespace_alias_call",True) def remove_namespace_alias(self, namespace): """ called to initialize to remove a local minio alias for a namespace user """ return util.check(self.mc("alias","remove",f"local_{namespace}"),"remove_namespace_alias_call",True) def upload_folder_content(self,origin,bucket): """ uploads the given content using a local alias for the corresponding namespace """ return util.check(self.mc("cp","-r",origin,f"{self.alias}/{bucket}"),"upload_folder_content",True)