src/ab/utils/oss_util.py (44 lines of code) (raw):

import subprocess from ab.utils.abt_config import config ossutil = config.get_value("ossutil") def download(bucket: str, local_file: str, oss_file: str, update: bool, is_folder: bool): command_list = [ossutil, " cp "] if is_folder: command_list.append(" -r ") command_list.append(" oss://") command_list.append(bucket) command_list.append("/") command_list.append(oss_file) command_list.append(" ") command_list.append(local_file) if update: command_list.append(" -u") subprocess.run("".join(command_list), shell=True) def upload(bucket: str, local_file: str, oss_file: str, update: bool, is_folder: bool): command_list = [ossutil, " cp "] if is_folder: command_list.append(" -r ") command_list.append(local_file) command_list.append(" ") command_list.append(" oss://") command_list.append(bucket) command_list.append("/") command_list.append(oss_file) if update: command_list.append(" -u") subprocess.run("".join(command_list), shell=True) def create_directory(bucket_name, dir_name): command_list = [ossutil, " mkdir ", " oss://", bucket_name, "/", dir_name] subprocess.run("".join(command_list), shell=True) def create_directories(oss_mount): for oss in oss_mount: create_directory(oss.get_bucket_name(), oss.get_bucket_path()) def list_file(bucket_name, dir_name): command_list = [ossutil, " ls ", " oss://", bucket_name, "/", dir_name] subprocess.run("".join(command_list), shell=True) def rm_object(bucket_name, dir_name): command_list = [ossutil, " rm ", " oss://", bucket_name, "/", dir_name, " -r -f"] subprocess.run("".join(command_list), shell=True) def desc_file(file_name): command_list = [ossutil, " cat ", file_name] subprocess.run("".join(command_list), shell=True)