actions/devel/minio/command/minio.py (70 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import common.util as ut
import common.minio_util as mutil
import json
import re
from common.command_data import CommandData
from minio.commonconfig import CopySource
class Minio():
"""
Implementation of a Minio Command executor. It will require
a user_data dictionary linked to a specific user.
"""
def __init__(self, user_data):
self._user_data = user_data
self._minio_access_key= ut.get_env_value(user_data,"S3_ACCESS_KEY")
self._minio_secret_key= ut.get_env_value(user_data,"S3_SECRET_KEY")
self._minio_host= ut.get_env_value(user_data,"S3_HOST")
self._minio_port= ut.get_env_value(user_data,"S3_PORT")
self._minio_data_bucket= ut.get_env_value(user_data,"S3_DATA_BUCKET")
self._minio_static_bucket= ut.get_env_value(user_data,"S3_STATIC_BUCKET")
self.validate()
def validate(self):
"""
Validate that the provided user_data contains the appropriate
metadata for being able to submit a postgres command.
"""
if not self._minio_access_key or not self._minio_secret_key or not self._minio_host or not self._minio_port:
raise Exception("user does not have valid MINIO environment set")
def _list_buckets(self, input:CommandData):
print("**** listing user buckets")
mo_client = mutil.build_mo_client(self._minio_host, self._minio_port,self._minio_access_key, self._minio_secret_key)
result = []
buckets = mo_client.list_buckets()
for bucket in buckets:
result.append({"bucket": bucket.name, "creation_date": str(bucket.creation_date)})
input.result(result)
input.status(200)
def _list_bucket_content(self,bucket,input:CommandData):
print(f"**** listing bucket {bucket} content")
mo_client = mutil.build_mo_client(self._minio_host, self._minio_port,self._minio_access_key, self._minio_secret_key)
result = []
objects = mo_client.list_objects(bucket_name=bucket, recursive= True)
for obj in objects:
result.append({"name":obj.object_name,"last_modified": str(obj.last_modified), "size":obj.size})
input.result(result)
input.status(200)
def _rm_bucket_object(self,bucket,filename,input:CommandData):
print(f"**** removing file {filename} inside bucket {bucket}")
mo_client = mutil.build_mo_client(self._minio_host, self._minio_port,self._minio_access_key, self._minio_secret_key)
mo_client.remove_object(bucket, filename)
result = {"result":"OK"}
input.result(result)
input.status(200)
def _mv_bucket_object(self,orig_bucket,orig_filename,dest_bucket,dest_filename,input:CommandData):
print(f"**** moving file {orig_filename} from bucket {orig_bucket} to bucket {dest_bucket} with name {dest_filename}")
mo_client = mutil.build_mo_client(self._minio_host, self._minio_port,self._minio_access_key, self._minio_secret_key)
object_name = mutil.mv_file(mo_client,orig_bucket,orig_filename,dest_bucket,dest_filename)
if object_name:
input.result({"name":object_name})
input.status(200)
else:
input.result(f"move operation failed")
input.status(400)
def _cp_bucket_object(self,orig_bucket,orig_filename,dest_bucket,dest_filename,input:CommandData):
print(f"**** copying file {orig_filename} from bucket {orig_bucket} to bucket {dest_bucket} with name {dest_filename}")
mo_client = mutil.build_mo_client(self._minio_host, self._minio_port,self._minio_access_key, self._minio_secret_key)
object_name = mutil.cp_file(mo_client,orig_bucket,orig_filename,dest_bucket,dest_filename)
if object_name:
input.result({"name":object_name})
input.status(200)
else:
input.result(f"move operation failed")
input.status(400)
def _clean_bucket_content(self,input:CommandData, bucket, pattern=".*", dry_run="true"):
""""
Removes matching content from the specified bucket
:param input the entire input command
:param bucket
:param pattern a regexp specifying the filename to match (defaults to .*)
:param dry_run a boolean to enable dry run execution (defaults to False)
"""
print(f"**** cleaning bucket {bucket} content matching {pattern}. Dry run mode {dry_run}")
mo_client = mutil.build_mo_client(self._minio_host, self._minio_port,self._minio_access_key, self._minio_secret_key)
result = []
objects = mo_client.list_objects(bucket_name=bucket, recursive= True)
for obj in objects:
if re.search(pattern, obj.object_name):
if "true" in dry_run:
result.append({"name":obj.object_name,"last_modified": str(obj.last_modified), "size":obj.size})
else:
removed = mutil.rm_file(mo_client,bucket,obj.object_name)
if removed:
result.append({"name":obj.object_name})
else:
print(f"skipping {obj.object_name} as it does not match {pattern}")
input.result(result)
input.status(200)
def execute(self, input:CommandData):
print(f"**** Minio command to execute {input.command()}")
try:
input.status(400)
input.result(f"could not execute minio command {input.command()} invalid arguments.")
if "ls" in input.command() and not "args" in input.get_metadata():
self._list_buckets(input)
if "ls" in input.command() and "args" in input.get_metadata() and len(input.args()) == 1:
self._list_bucket_content(input.args()[0],input)
if "rm" in input.command() and "args" in input.get_metadata() and len(input.args()) == 2:
self._rm_bucket_object(input.args()[0],input.args()[1],input)
if "mv" in input.command() and "args" in input.get_metadata() and len(input.args()) == 4:
self._mv_bucket_object(input.args()[0],input.args()[1],input.args()[2],input.args()[3], input)
if "cp" in input.command() and "args" in input.get_metadata() and len(input.args()) == 4:
self._cp_bucket_object(input.args()[0],input.args()[1],input.args()[2],input.args()[3], input)
if "clean" in input.command() and "args" in input.get_metadata() and len(input.args()) == 3:
self._clean_bucket_content(input,input.args()[0],input.args()[1],input.args()[2])
except Exception as e:
input.result(f"could not execute minio command {e}")
input.status(400)
return input