mds_plugin/object_store.py (1,104 lines of code) (raw):
# Copyright (c) 2021, 2024, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0,
# as published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms, as
# designated in a particular file or component or in included license
# documentation. The authors of MySQL hereby grant you an additional
# permission to link the program and your derivative works with the
# separately licensed software that they have either included with
# the program or referenced in the documentation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
"""Sub-Module to manage OCI Object Storage"""
from mysqlsh.plugin_manager import plugin_function
from mds_plugin import core, configuration
# Number of threads used in parallel for bucket file operations
NTHREAD = 50
def get_object_store_namespace(config=None):
"""Returns the object store namespace_name
Args:
config (object): An OCI config object or None.
Returns:
The namespace_name
"""
import oci.object_storage
# Check if object_store_namespace_name is already cached in config
namespace_name = config.get("object_store_namespace_name")
if not namespace_name:
# Initialize the Object Store client
os_client = core.get_oci_object_storage_client(config=config)
# Get Object Store namespace
namespace_name = os_client.get_namespace().data
# Cache object_store_namespace_name in config
config["object_store_namespace_name"] = namespace_name
return namespace_name
def format_buckets_listing(buckets, current_bucket=None):
"""Returns a formatted list of buckets.
Args:
buckets (list): A list of buckets objects.
Returns:
The formatted list as string
"""
import re
out = ""
i = 1
for b in buckets:
# Shorten to 24 chars max, remove linebreaks
name = re.sub(r'[\n\r]', ' ',
b.name[:22] + '..'
if len(b.name) > 24
else b.name)
index = f"*{i:>3}" if current_bucket == b.name else f"{i:>4}"
out += (f"{index} {name:24} {b.time_created:%Y-%m-%d %H:%M}\n")
i += 1
return out
def sizeof_fmt(size):
"""Returns a formatted file size
Args:
size (integer): Size in bytes
Returns:
The formatted size as string
"""
if size is None:
return "0B"
if size < 1024:
return f"{size}B"
for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z']:
if abs(size) < 1024.0:
return "%3.1f%s%s" % (size, unit, 'B')
size /= 1024.0
return "%.1f%s%s" % (size, 'Y', 'B')
def format_bucket_objects_listing(bucket_list_objects):
"""Returns a formatted list of buckets.
Args:
buckets (list): A list of buckets objects.
Returns:
The formatted list as string
"""
import re
import math
out = ""
i = 1
for p in bucket_list_objects.prefixes:
# Shorten to 24 chars max, remove linebreaks
name = re.sub(r'[\n\r]', ' ',
p[:63] + '..'
if len(p) > 65
else p)
out += (f"{i:>4} {name:65} (Prefix)\n")
i += 1
for o in bucket_list_objects.objects:
# Shorten to 24 chars max, remove linebreaks
name = re.sub(r'[\n\r]', ' ',
o.name[:63] + '..'
if len(o.name) > 65
else o.name)
size = sizeof_fmt(o.size)
time = f"{o.time_modified:%Y-%m-%d %H:%M}" \
if o.time_modified is not None else ""
out += (f"{i:>4} {name:65} {size:8} {time:16}\n")
i += 1
return out
@plugin_function('mds.list.buckets', shell=True, cli=True, web=True)
def list_buckets(**kwargs):
"""Lists object store buckets
This function will list all buckets of the compartment with the
given compartment_id.
Args:
**kwargs: Optional parameters
Keyword Args:
compartment_id (str): OCID of the parent compartment.
config (object): An OCI config object or None.
config_profile (str): The name of an OCI config profile
raise_exceptions (bool): If set to True exceptions are raised
interactive (bool): Whether output is more descriptive
return_formatted (bool): If set to true, a list object is returned.
Returns:
A list of dicts representing the buckets
"""
compartment_id = kwargs.get("compartment_id")
config = kwargs.get("config")
config_profile = kwargs.get("config_profile")
interactive = kwargs.get("interactive", core.get_interactive_default())
raise_exceptions = kwargs.get("raise_exceptions", not interactive)
return_formatted = kwargs.get("return_formatted", interactive)
# Get the active config and compartment
try:
config = configuration.get_current_config(
config=config, config_profile=config_profile,
interactive=interactive)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
bucket_name = configuration.get_current_bucket_name(
config=config)
import oci.object_storage
import oci.util
# Initialize the Object Store client
os_client = core.get_oci_object_storage_client(config=config)
# Get Object Store namespace
namespace_name = get_object_store_namespace(config)
# List the buckets
buckets = os_client.list_buckets(
namespace_name=namespace_name,
compartment_id=compartment_id).data
if len(buckets) < 1 and interactive:
print("This compartment contains no buckets.")
if return_formatted:
return format_buckets_listing(
buckets=buckets, current_bucket=bucket_name)
else:
# return compartments in JSON text output
return oci.util.to_dict(buckets)
except oci.exceptions.ServiceError as e:
if raise_exceptions:
raise
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
return
except (ValueError, oci.exceptions.ClientError) as e:
if raise_exceptions:
raise
print(f'ERROR: {e}')
@plugin_function('mds.get.bucket')
def get_bucket(bucket_name=None, compartment_id=None, config=None,
ignore_current=False, interactive=True):
"""Get bucket
Args:
bucket_name (str): The name of the policy
compartment_id (str): The OCID of the compartment.
config (object): An OCI config object or None.
ignore_current (bool): Ignores the current bucket
interactive (bool): Whether exceptions should be raised
Returns:
The policy object or None
"""
# Get the current config
try:
config = configuration.get_current_config(config=config)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
if not ignore_current:
bucket_name = configuration.get_current_bucket_name(
bucket_name=bucket_name, config=config)
import oci.object_storage
import mysqlsh
# Initialize the Object Store client
os_client = core.get_oci_object_storage_client(config=config)
# Get Object Store namespace
namespace_name = get_object_store_namespace(config)
# if a bucket_name was provided, look up the bucket directly
if bucket_name is not None:
try:
bucket = os_client.get_bucket(
namespace_name=namespace_name, bucket_name=bucket_name).data
return bucket
except oci.exceptions.ServiceError as e:
if not interactive:
raise
if e.status == 404:
print(f'The bucket with the name {bucket_name} was not '
f'found.')
else:
print(f'ERROR: {e.message}. (Code: {e.code}; '
f'Status: {e.status})')
return
# Get item list
buckets = os_client.list_buckets(
namespace_name=namespace_name,
compartment_id=compartment_id).data
# Print the user list
item_list = format_buckets_listing(buckets)
print(f"Buckets:\n{item_list}")
# Let the user choose from the list
bucket = core.prompt_for_list_item(
item_list=buckets, prompt_caption=("Please enter the name or index "
"of the bucket: "),
item_name_property="name")
return bucket
except oci.exceptions.ServiceError as e:
if not interactive:
raise
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
return
except Exception as e:
if not interactive:
raise
print(f'ERROR: {e}')
return
@plugin_function('mds.create.bucket')
def create_bucket(bucket_name=None, compartment_id=None, config=None,
interactive=True, return_object=False):
"""Creates a new object store buckets
Args:
bucket_name (str): The name of the new bucket.
compartment_id (str): OCID of the parent compartment.
config (object): An OCI config object or None.
interactive (bool): Whether exceptions should be raised
return_object (bool): Whether the bucket object should be returned
Returns:
None or the created bucket object
"""
# Get the active config and compartment
try:
config = configuration.get_current_config(config=config)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
import oci.object_storage
import mysqlsh
if not interactive and bucket_name is None:
raise ValueError("A bucket_name needs to be provided")
elif bucket_name is None:
print("Creating a new object store bucket ...\n")
# Get a name
bucket_name = mysqlsh.globals.shell.prompt(
"Please enter the name for the new bucket: ",
{'defaultValue': ''}).strip()
if bucket_name == "":
print("Operation cancelled.")
return
# Initialize the Object Store client
os_client = core.get_oci_object_storage_client(config=config)
# Get Object Store namespace
namespace_name = get_object_store_namespace(config)
create_bucket_details = oci.object_storage.models.CreateBucketDetails(
name=bucket_name,
compartment_id=compartment_id
)
bucket = os_client.create_bucket(
namespace_name=namespace_name,
create_bucket_details=create_bucket_details).data
if return_object:
return bucket
else:
print(f"Object Store Bucket {bucket_name} has being created.\n")
except oci.exceptions.ServiceError as e:
if not interactive:
raise
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
return
except (ValueError, oci.exceptions.ClientError) as e:
if not interactive:
raise
print(f'ERROR: {e}')
return
@plugin_function('mds.delete.bucket')
def delete_bucket(bucket_name=None, compartment_id=None, config=None,
interactive=True):
"""Deletes an object store bucket
Args:
bucket_name (str): The name of the bucket.
compartment_id (str): OCID of the parent compartment.
config (object): An OCI config object or None.
interactive (bool): If set to false, function returns true on success
Returns:
None or True
"""
# Get the active config and compartment
try:
config = configuration.get_current_config(config=config)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
import oci.object_storage
import mysqlsh
bucket = get_bucket(
bucket_name=bucket_name, compartment_id=compartment_id,
config=config, ignore_current=True)
if bucket is None:
print("Operation Cancelled.\n")
return
# Initialize the Object Store client
os_client = core.get_oci_object_storage_client(config=config)
# Get Object Store namespace
namespace_name = get_object_store_namespace(config)
# Check if the bucket still has objects
object_list = os_client.list_objects(
namespace_name=namespace_name, bucket_name=bucket.name).data.objects
obj_count = len(object_list)
if obj_count > 0:
print(f"The bucket {bucket.name} contains {obj_count} "
f"object{'s' if obj_count > 1 else ''}.")
if interactive:
# Prompt the user for confirmation
prompt = mysqlsh.globals.shell.prompt(
f"Are you sure you want to delete the bucket {bucket.name} "
f"[yes/NO]: ",
{'defaultValue': 'no'}).strip().lower()
if prompt != "yes":
print("Deletion aborted.\n")
return
if obj_count > 0:
delete_bucket_object(
name="*", bucket_name=bucket.name,
compartment_id=compartment_id, config=config, interactive=False)
# Check if the bucket still has PARs
pars = os_client.list_preauthenticated_requests(
namespace_name=namespace_name, bucket_name=bucket.name).data
for p in pars:
os_client.delete_preauthenticated_request(
namespace_name=namespace_name, bucket_name=bucket.name,
par_id=p.id)
os_client.delete_bucket(
namespace_name=namespace_name, bucket_name=bucket.name)
if interactive:
print(f"Bucket {bucket.name} deleted successfully.")
else:
return True
except oci.exceptions.ServiceError as e:
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
return
except Exception as e:
print(f'ERROR: {e}')
return
def delete_file_from_list_from_bucket(index, os_client, objects,
namespace_name, bucket_name,
thread_count):
"""Deletes a file from a given file list
Args:
index (int): The thread index, specifying which files from the list to
upload
os_client (object): An oci object store client instance
files (list): The list of filenames as strings
namespace_name (str): The OCI object store namespace of the tenancy
bucket_name (str): The name of the bucket to upload to
Returns:
None
"""
import os.path
for o in objects:
if hash(o.name) % thread_count == index:
os_client.delete_object(
namespace_name=namespace_name, bucket_name=bucket_name,
object_name=o.name)
@plugin_function('mds.delete.bucketObject', shell=True, cli=True, web=True)
def delete_bucket_object(name=None, **kwargs):
"""Deletes an object store bucket objects
Args:
name (str): The name of the object, can include * to match multiple
objects
**kwargs: Additional options
Keyword Args:
bucket_name (str): The name of the bucket.
compartment_id (str): OCID of the parent compartment.
config (object): An OCI config object or None.
config_profile (str): The name of an OCI config profile
interactive (bool): If set to false, function returns true on success
Returns:
None
"""
compartment_id = kwargs.get("compartment_id")
config = kwargs.get("config")
config_profile = kwargs.get("config_profile")
bucket_name = kwargs.get("bucket_name")
interactive = kwargs.get("interactive", core.get_interactive_default())
raise_exceptions = kwargs.get("raise_exceptions", not interactive)
# Get the active config and compartment
try:
# Get the active config and compartment
config = configuration.get_current_config(
config=config, config_profile=config_profile,
interactive=interactive)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
bucket_name = configuration.get_current_bucket_name(
bucket_name=bucket_name, config=config)
import oci.object_storage
import mysqlsh
import re
bucket = get_bucket(
bucket_name=bucket_name, compartment_id=compartment_id,
config=config)
if bucket is None:
if interactive:
print("Operation Cancelled.\n")
return
# Initialize the Object Store client
os_client = core.get_oci_object_storage_client(config=config)
# Get Object Store namespace
namespace_name = get_object_store_namespace(config)
# If the user specified * as name, delete all
if name and (name == '*' or '*' in name):
# Get object list
objects = oci.pagination.list_call_get_all_results(
os_client.list_objects,
namespace_name=namespace_name,
bucket_name=bucket.name,
limit=1000).data.objects
# Filter list
if name != '*':
name = name.lower()
# Filter list if PARs
if '*' in name:
name_pattern = '^' + name.replace('*', '.+')
objects = [obj for obj in objects
if re.search(name_pattern, obj.name.lower())]
else:
objects = [obj for obj in objects
if name == obj.name.lower()]
# Get object count
obj_count = len(objects)
if obj_count == 0:
if interactive:
print("No matching objects found for deletion.")
return
else:
raise ValueError("No matching objects found for deletion.")
# Prompt the user for confirmation
if interactive:
prompt = mysqlsh.globals.shell.prompt(
f"Are you sure you want to delete {obj_count} object"
f"{'s' if obj_count > 1 else ''} from {bucket.name} "
f"[yes/NO]: ",
{'defaultValue': 'no'}).strip().lower()
if prompt != "yes":
print("Deletion aborted.\n")
return
# Delete all objects
print(f"Deleting {obj_count} "
f"object{'s' if obj_count > 1 else ''}.")
import threading
thread_count = NTHREAD if obj_count > NTHREAD else obj_count
ths = [threading.Thread(
target=delete_file_from_list_from_bucket,
args=(i, os_client, objects, namespace_name, bucket.name,
thread_count))
for i in range(thread_count)]
for th in ths:
th.daemon = True
th.start()
for th in ths:
th.join()
if interactive:
print(f"Bucket object{'s' if '*' in name else ''} "
f"deleted successfully.")
elif name:
os_client.delete_object(
namespace_name=namespace_name, bucket_name=bucket.name,
object_name=name)
if interactive:
print(f"Bucket object '{name}' deleted successfully.")
elif interactive:
# Get object list
bucket_objects = oci.pagination.list_call_get_all_results(
os_client.list_objects,
namespace_name=namespace_name,
bucket_name=bucket.name,
limit=1000).data.objects
print(format_bucket_objects_listing(bucket_objects=bucket_objects))
obj_summary = core.prompt_for_list_item(
item_list=bucket_objects,
prompt_caption="Please enter the index or name of an object: ",
item_name_property="name")
if obj_summary is None:
print("Operation cancelled.")
return
name = obj_summary.name
os_client.delete_object(
namespace_name=namespace_name, bucket_name=bucket.name,
object_name=name)
print(f"Bucket object '{name}' deleted successfully.")
else:
if interactive:
print('No object name given.')
return
raise ValueError("No object name given.")
except oci.exceptions.ServiceError as e:
if raise_exceptions:
raise
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
except Exception as e:
if raise_exceptions:
raise
print(f'Could not create the bucket objects.\n'
f'ERROR: {str(e)}')
def bucket_object_upload_progress_callback(bytes_uploaded):
print(f"Additional {sizeof_fmt(bytes_uploaded)} uploaded.")
@plugin_function('mds.create.bucketObject')
def create_bucket_object(file_name=None, name=None,
bucket_name=None, file_content=None,
compartment_id=None, config=None,
interactive=True):
"""Creates a new object store bucket object
Args:
file_name (str): The name of the file to upload
name (str): The name of the file object to create
bucket_name (str): The name of the new bucket.
file_content (str): The contents of the file
compartment_id (str): OCID of the parent compartment.
config (object): An OCI config object or None.
interactive (bool): Whether exceptions should be raised
Returns:
None
"""
# Get the active config and compartment
try:
config = configuration.get_current_config(config=config)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
bucket_name = configuration.get_current_bucket_name(
bucket_name=bucket_name, config=config)
except ValueError as e:
print(f"ERROR: {str(e)}")
return
import oci.object_storage
import mysqlsh
import os.path
import io
if interactive and file_name is None and file_content is None:
print("Creating a new bucket object ...\n")
# Get a name (if no file_content was given)
if file_name is None and file_content is None:
file_name = mysqlsh.globals.shell.prompt(
"Please enter the path to a local file to upload: ",
{'defaultValue': ''}).strip()
if file_name == "":
print("Operation cancelled.")
return
if file_content is None:
file_name = os.path.abspath(
os.path.expanduser(file_name))
if not os.path.exists(file_name):
print(f"Cannot find the file {file_name}.")
return
if name is None and file_name:
name = os.path.basename(file_name)
if name is None:
print(f"No name for the bucked file specified.")
return
# Initialize the Object Store client
os_client = core.get_oci_object_storage_client(config=config)
# Get Object Store namespace
namespace_name = get_object_store_namespace(config)
try:
# If the file content was given as string
if file_content is not None:
with io.StringIO() as f:
f.write(file_content)
f.seek(0)
# put_object returns a response object with data None
os_client.put_object(
namespace_name=namespace_name, bucket_name=bucket_name,
object_name=name, put_object_body=f)
else:
# upload manager will automatically use singlepart uploads if the
# part size (in bytes) is less than the file size
part_size = oci.object_storage.transfer.constants.DEFAULT_PART_SIZE
upload_manager = oci.object_storage.UploadManager(
object_storage_client=os_client,
allow_parallel_uploads=True, parallel_process_count=5)
upload_manager.upload_file(
namespace_name=namespace_name, bucket_name=bucket_name,
object_name=name,
file_path=file_name,
part_size=part_size,
progress_callback=bucket_object_upload_progress_callback)
except oci.exceptions.ServiceError as e:
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
return
except Exception as e:
print(f'ERROR: {e}')
return
if interactive:
print("Bucket object created successfully.")
else:
return True
def bucket_objects_upload_progress_callback(file_path, bytes_uploaded, total_file_size, send_gui_message):
if send_gui_message is not None:
send_gui_message("data", {
"file_path": file_path,
"bytes_uploaded": bytes_uploaded,
"total_file_size": total_file_size})
else:
print(f'File: {file_path} {bytes_uploaded=} {total_file_size=}')
@plugin_function('mds.create.bucketObjects', shell=True, cli=True, web=True)
def create_bucket_objects(file_paths, prefix, **kwargs):
"""Creates a new object store bucket object
Args:
file_paths (list): The name of the file to upload
prefix (str): The name of the file object to create
**kwargs: Additional options
Keyword Args:
bucket_name (str): The name of the new bucket.
compartment_id (str): OCID of the parent compartment.
config (object): An OCI config object or None.
config_profile (str): The name of an OCI config profile
interactive (bool): Whether exceptions should be raised
raise_exceptions (bool): If true exceptions are raised
send_gui_message (object): The function to send a message to he GUI.
fix_extension (bool): If set to true, the extension is changed to lower case.
Returns:
None
"""
import oci.object_storage
import os.path
import traceback
from mds_plugin.object_store_uploader import parallel_bucket_upload
compartment_id = kwargs.get("compartment_id")
config = kwargs.get("config")
config_profile = kwargs.get("config_profile")
bucket_name = kwargs.get("bucket_name")
interactive = kwargs.get("interactive", core.get_interactive_default())
raise_exceptions = kwargs.get("raise_exceptions", not interactive)
send_gui_message = kwargs.get("send_gui_message")
fix_ext = kwargs.get("fix_extension", False)
# Get the active config and compartment
try:
# Get the active config and compartment
config = configuration.get_current_config(
config=config, config_profile=config_profile,
interactive=interactive)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
bucket_name = configuration.get_current_bucket_name(
bucket_name=bucket_name, config=config)
if file_paths is None or len(file_paths) == 0:
raise Exception("No files given.")
if bucket_name is None or bucket_name == "":
bucket = get_bucket(
compartment_id=compartment_id, config=config, ignore_current=True, interactive=interactive)
if bucket is None:
return
bucket_name = bucket.name
if interactive:
print("Creating a new bucket objects ...\n")
# Initialize the Object Store client
os_client = core.get_oci_object_storage_client(config=config)
# Get Object Store namespace
namespace_name = get_object_store_namespace(config)
# upload manager will automatically use single part uploads if the
# part size (in bytes) is less than the file size
# part_size = oci.object_storage.transfer.constants.DEFAULT_PART_SIZE
part_size = 262144
def report_error(e, file_path):
if isinstance(e, oci.exceptions.ServiceError):
if send_gui_message is not None:
send_gui_message(
"data", {
"filePath": file_path,
"error": f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})'})
elif interactive:
print(
f'{file_path} - ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
elif isinstance(e, oci.exceptions.ServiceError):
if send_gui_message is not None:
send_gui_message(
"data", {
"filePath": file_path,
"error": f'File {file_path} cannot be found. ERROR {e}'})
elif interactive:
print(f'{file_path} - ERROR: {e}')
else:
if send_gui_message is not None:
send_gui_message("data", {
"filePath": file_path,
"error": f'ERROR: {e}'})
elif interactive:
print(f'{file_path} - ERROR: {e}')
def progress_callback(data):
if data["status"] == "ERROR":
report_error(data["error"], data["file_path"])
elif data["status"] == "PROGRESS":
bucket_objects_upload_progress_callback(data["file_path"], data["bytes_uploaded"], data["file_size"], send_gui_message)
def fix_extension(file_path):
if fix_ext:
root, ext = os.path.splitext(file_path)
return root + ext.lower()
else:
return file_path
try:
parallel_bucket_upload(
files=[
{
"object_name": prefix + fix_extension(os.path.basename(f)),
"file_path": os.path.expanduser(f),
}
for f in file_paths
],
status_fn=progress_callback,
os_client=os_client,
namespace=namespace_name,
bucket_name=bucket_name,
processes_per_file=5,
part_size=part_size,
num_workers=8,
)
except Exception as e:
report_error(e, None)
except Exception as e:
if raise_exceptions:
raise
print(f'Could not create the bucket objects.\n'
f'ERROR: {str(e)}')
@plugin_function('mds.list.bucketObjects', shell=True, cli=True, web=True)
def list_bucket_objects(**kwargs):
"""Lists bucket object
This function will list all bucket objects of the bucket with the
given bucket_name.
Args:
**kwargs: Additional options
Keyword Args:
bucket_name (str): The name of the bucket
name (str): Then name of the bucket object, can include *
to match multiple objects
prefix (str): The string to use for matching against the start of object names in a list query
delimiter (str): When this parameter is set, only objects whose names do not contain the delimiter character
(after an optionally specified prefix) are returned in the objects key of the response body.
Scanned objects whose names contain the delimiter have the part of their name up to the first occurrence
of the delimiter (including the optional prefix) returned as a set of prefixes. Note that only "/" is a
supported delimiter character at this time.
compartment_id (str): OCID of the parent compartment.
config (object): An OCI config object or None.
config_profile (str): The name of an OCI config profile
interactive (bool): If set to false exceptions are raised
return_formatted (bool): If set to true, a list object is returned.
Returns:
A list of dicts representing the bucket objects or a string or none
"""
bucket_name = kwargs.get('bucket_name')
name = kwargs.get('name')
prefix = kwargs.get('prefix')
delimiter = kwargs.get('delimiter')
compartment_id = kwargs.get('compartment_id')
config = kwargs.get("config")
config_profile = kwargs.get("config_profile")
interactive = kwargs.get("interactive", core.get_interactive_default())
raise_exceptions = kwargs.get("raise_exceptions", not interactive)
return_formatted = kwargs.get("return_formatted", interactive)
# Get the active config and compartment
try:
config = configuration.get_current_config(
config=config, config_profile=config_profile,
interactive=interactive)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
if bucket_name is None:
bucket_name = configuration.get_current_bucket_name(
config=config)
import oci.object_storage
import oci.util
import re
bucket = get_bucket(
bucket_name=bucket_name, compartment_id=compartment_id, config=config,
interactive=interactive)
if bucket is None:
print("Operation cancelled.")
return
# Initialize the Object Store client
os_client = core.get_oci_object_storage_client(config=config)
# Get Object Store namespace
namespace_name = get_object_store_namespace(config)
# List the bucket objects
# bucket_objects = os_client.list_objects(
# namespace_name=namespace_name,
# bucket_name=bucket.name,
# fields="name,size,timeModified").data.objects
bucket_list_objects = oci.pagination.list_call_get_all_results(
os_client.list_objects,
namespace_name=namespace_name,
bucket_name=bucket.name,
prefix=prefix,
delimiter=delimiter,
fields="name,size,timeModified",
limit=1000).data
# Filter list if PARs
if name and name != '*':
name = name.lower()
# Filter list if PARs
if '*' in name:
name_pattern = '^' + name.replace('*', '.+')
bucket_list_objects.objects = [obj for obj in bucket_list_objects.objects
if re.search(name_pattern, obj.name.lower())]
else:
bucket_list_objects.objects = [obj for obj in bucket_list_objects.objects
if name == obj.name.lower()]
if len(bucket_list_objects.prefixes) + len(bucket_list_objects.objects) < 1 and interactive:
if name:
print(f"The bucket {bucket.name} contains no objects matching "
f"the object name {name}.")
else:
print(f"The bucket {bucket.name} contains no objects.")
return
if return_formatted:
return format_bucket_objects_listing(bucket_list_objects=bucket_list_objects)
else:
# return compartments in JSON text output
return oci.util.to_dict(bucket_list_objects)
except oci.exceptions.ServiceError as e:
if raise_exceptions:
raise
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
return
except (ValueError, oci.exceptions.ClientError) as e:
if raise_exceptions:
raise
print(f'ERROR: {e}')
@plugin_function('mds.get.bucketObject')
def get_bucket_object(name=None, file_name=None, bucket_name=None,
compartment_id=None, config=None,
no_error_on_not_found=False):
"""Get a bucket object by name
This function will either save the file to disk, if file_name is given
or return the contents as a string
Args:
name (str): If set to JSON, output is formatted that way.
file_name (str): The name of the file that should be created.
bucket_name (str): The name of the bucket.
compartment_id (str): The OCID of the compartment
config (object): An OCI config object or None.
no_error_on_not_found (bool): Whether to print out an error on 404
Returns:
A list of dicts representing the bucket objects or a string
"""
# Get the active config and compartment
try:
config = configuration.get_current_config(config=config)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
bucket_name = configuration.get_current_bucket_name(
bucket_name=bucket_name, config=config)
except ValueError as e:
print(f"ERROR: {str(e)}")
return
import oci.object_storage
import oci.util
import os.path
import io
# Get bucket
bucket = get_bucket(
bucket_name=bucket_name, compartment_id=compartment_id, config=config)
if bucket is None:
print("Operation cancelled.")
return
# Initialize the Object Store client
os_client = core.get_oci_object_storage_client(config=config)
# Get Object Store namespace
namespace_name = get_object_store_namespace(config)
try:
# If the user did not specify an name
if name is None:
# Let the user select an object
bucket_objects = os_client.list_objects(
namespace_name=namespace_name,
bucket_name=bucket.name,
fields="name,size,timeModified").data.objects
print(format_bucket_objects_listing(bucket_objects=bucket_objects))
obj_summary = core.prompt_for_list_item(
item_list=bucket_objects,
prompt_caption="Please enter the index or name of an object: ",
item_name_property="name")
if obj_summary is None:
print("Operation cancelled.")
return
name = obj_summary.name
# Look up the object by name
obj = os_client.get_object(
namespace_name=namespace_name, bucket_name=bucket.name,
object_name=name)
if file_name is not None:
file_name = os.path.abspath(
os.path.expanduser(file_name))
with open(file_name, 'wb') as f:
for chunk in obj.data.raw.stream(1024 * 1024,
decode_content=False):
f.write(chunk)
print(f"File {file_name} was written to disk.")
else:
with io.BytesIO() as f:
for chunk in obj.data.raw.stream(
1024 * 1024, decode_content=False):
f.write(chunk)
contents = f.getvalue().decode("utf-8")
return contents
except oci.exceptions.ServiceError as e:
if not (no_error_on_not_found and e.status == 404):
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
return
except Exception as e:
print(f'ERROR: {e}')
return
def download_text_file(url):
"""Downloads a file from a given URL
Args:
url (str): The URL to the text file to download
Returns:
The contents of the file decoded to utf-8 or None
"""
import urllib.request
try:
response = urllib.request.urlopen(url)
data = response.read()
return data.decode('utf-8')
except urllib.error.URLError as e:
if e.reason == 'CERTIFICATE_VERIFY_FAILED':
try:
install_ssl_certificates()
response = urllib.request.urlopen(url)
data = response.read()
return data.decode('utf-8')
except urllib.error.URLError as e:
print(f"Could not download file from {url}\nERROR: {str(e)}")
else:
print(f"Could not download file from {url}\nERROR: {str(e)}")
def install_ssl_certificates():
import os
import os.path
import ssl
import stat
import subprocess
import sys
# cSpell:ignore cafile certifi chdir IRGRP IROTH IRUSR IWGRP IWUSR IXGRP
# cSpell:ignore IXOTH IXUSR
STAT_0o775 = (stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
| stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP
| stat.S_IROTH | stat.S_IXOTH)
openssl_dir, openssl_cafile = os.path.split(
ssl.get_default_verify_paths().openssl_cafile)
# print(" -- pip install --upgrade certifi")
# subprocess.check_call([sys.executable,
# "-E", "-s", "-m", "pip", "install", "--upgrade", "certifi"])
import certifi
print("Installing SSL certificate...")
# change working directory to the default SSL directory
os.chdir(openssl_dir)
relpath_to_certifi_cafile = os.path.relpath(certifi.where())
print(" -- removing any existing file or link")
try:
os.remove(openssl_cafile)
except FileNotFoundError:
pass
print(" -- creating symlink to certifi certificate bundle")
os.symlink(relpath_to_certifi_cafile, openssl_cafile)
print(" -- setting permissions")
os.chmod(openssl_cafile, STAT_0o775)
print(" -- update complete")
def bucket_dir_object_upload_progress_callback(bytes_uploaded):
# Only print . if bytes_uploaded is > 1MB
if bytes_uploaded >= 1024*1024*1024:
print(".")
def upload_file_from_list_to_bucket(index, upload_manager, files,
namespace_name, bucket_name,
object_name_prefix):
"""Uploads a file from a file list
Args:
index (int): The thread index, specifying which files from the list to
upload
upload_manager (object): An oci upload_manager instance
files (list): The list of filenames as strings
namespace_name (str): The OCI object store namespace of the tenancy
bucket_name (str): The name of the bucket to upload to
Returns:
None
"""
import os.path
if object_name_prefix is None:
object_name_prefix = ""
for file_name in files:
if hash(file_name) % NTHREAD == index:
file_size = sizeof_fmt(os.path.getsize(file_name))
object_name = object_name_prefix + os.path.basename(file_name)
print(f"{object_name} ({file_size}) ...")
upload_manager.upload_file(
namespace_name=namespace_name, bucket_name=bucket_name,
object_name=object_name,
file_path=file_name)
def create_bucket_objects_from_local_dir(local_dir_path=None, bucket_name=None,
object_name_prefix=None,
compartment_id=None, config=None,
interactive=True):
"""Imports a local dump on a DB System
Args:
local_dir_path (str): The directory to upload
bucket_name (str): The name of the bucket
compartment_id (str): The OCID of the compartment
config (object): An OCI config object or None.
Returns:
None
"""
# Get the active config and compartment
try:
config = configuration.get_current_config(config=config)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
bucket_name = configuration.get_current_bucket_name(
bucket_name=bucket_name, config=config)
except ValueError as e:
print(f"ERROR: {str(e)}")
return
import oci.mysql
from pathlib import Path
from os import listdir
import os.path
import datetime
import mysqlsh
# Get a local_dir_path
if local_dir_path is None and interactive:
local_dir_path = mysqlsh.globals.shell.prompt(
"Please enter the file path of the directory to upload: ",
{'defaultValue': ''}).strip()
if local_dir_path == "":
print("Operation cancelled.\n")
return
elif local_dir_path is None and not interactive:
print("No directory path specified.")
return
if object_name_prefix is None and interactive:
object_name_prefix = mysqlsh.globals.shell.prompt(
"Please enter the prefix to use for the object names, e.g. /, "
"or leave empty []: ",
{'defaultValue': ''}).strip()
local_dir_path = os.path.abspath(
os.path.expanduser(local_dir_path))
if not os.path.exists(local_dir_path):
print(f"Cannot find the file directory {local_dir_path}.")
return
file_list = [os.path.join(local_dir_path, name) for name
in os.listdir(local_dir_path)
if os.path.isfile(os.path.join(local_dir_path, name))]
if len(file_list) < 1:
print(f"File directory {local_dir_path} contains no files.")
return
# Prompt the user for confirmation
if interactive:
prompt = mysqlsh.globals.shell.prompt(
f"{len(file_list)} file{'s' if len(file_list)>1 else ''} will be "
f"uploaded from {local_dir_path}.\nDo you want to continue? "
f"[YES/no]: ",
{'defaultValue': 'yes'}).strip().lower()
if prompt != "yes":
print("Operation cancelled.\n")
return
# Get the bucket
bucket = get_bucket(
bucket_name=bucket_name, compartment_id=compartment_id, config=config)
if bucket is None:
print("Operation cancelled.\n")
return
# Initialize the Object Store client
os_client = core.get_oci_object_storage_client(config=config)
# Get Object Store namespace
namespace_name = get_object_store_namespace(config)
try:
upload_manager = oci.object_storage.UploadManager(
object_storage_client=os_client,
allow_parallel_uploads=True, parallel_process_count=3)
print(f"\nUploading files to bucket {bucket.name}...")
import threading
ths = [threading.Thread(
target=upload_file_from_list_to_bucket, args=(
i, upload_manager, file_list, namespace_name, bucket_name,
object_name_prefix))
for i in range(NTHREAD)]
for th in ths:
th.daemon = True
th.start()
for th in ths:
th.join()
# for file_name in file_list:
# file_size = sizeof_fmt(os.path.getsize(file_name))
# print(f"{os.path.basename(file_name)} ({file_size}) ...", end='')
# upload_manager.upload_file(
# namespace_name=namespace_name, bucket_name=bucket.name,
# object_name=os.path.basename(file_name),
# file_path=file_name,
# progress_callback=bucket_dir_object_upload_progress_callback,
# part_size=1024*1024*1024*128)
# print("")
except Exception as e:
print(f"Could not upload all files successfully.\n"
f"ERROR: {str(e)}")
return
if len(file_list) > 1:
print(f"\nAll {len(file_list)} files uploaded successfully.\n")
else:
print(f"\nThe file was uploaded successfully.\n")
if not interactive:
return len(file_list)
@plugin_function('mds.create.bucketObjectPreauthenticatedRequest')
def create_bucket_object_par(**kwargs):
"""Get a preauthenticated request for the given bucket object
Args:
**kwargs: Additional options
Keyword Args:
bucket_object_name (str): If set to JSON, output is formatted that way.
bucket_name (str): The name of the bucket
access_type (str): The type of access to grant ('r', 'w', 'rw')
valid_till (str): The point in time until the PAR is valid using the
format YYYY-MM-DD HH:MM:SS. If not specified it is now + 7 days.
compartment_id (str): OCID of the parent compartment.
config (object): An OCI config object or None.
return_object (bool): If the object should be returned
Returns:
The link or the object
"""
bucket_object_name = kwargs.get('bucket_object_name')
bucket_name = kwargs.get('bucket_name')
access_type = kwargs.get('access_type')
valid_till = kwargs.get('valid_till')
compartment_id = kwargs.get('compartment_id')
config = kwargs.get('config')
return_object = kwargs.get('return_object', False)
# Get the active config and compartment
try:
config = configuration.get_current_config(config=config)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
bucket_name = configuration.get_current_bucket_name(
bucket_name=bucket_name, config=config)
import oci.object_storage
import oci.util
import os.path
import io
import mysqlsh
import datetime
import time
# Get bucket
bucket = get_bucket(
bucket_name=bucket_name, compartment_id=compartment_id, config=config)
if bucket is None:
print("Operation cancelled.")
return
# Get bucket object name
if bucket_object_name is None:
bucket_object_name = mysqlsh.globals.shell.prompt(
"Please enter the name of the bucket object: ",
{'defaultValue': ''}).strip()
if bucket_object_name == "":
print("Operation cancelled.")
return
# Initialize the Object Store client
os_client = core.get_oci_object_storage_client(config=config)
# Get Object Store namespace
namespace_name = get_object_store_namespace(config)
# PAR valid till now + 7 day
if valid_till is None:
valid_till = datetime.datetime.now() + datetime.timedelta(days=7)
elif type(valid_till) is str:
try:
valid_till = datetime.datetime(*(time.strptime(
valid_till, f"%Y-%m-%d %H:%M:%S")[0:6]))
except ValueError as e:
print("Invalid datetime format. Use YYYY-MM-DD HH:MM:SS")
return
# Get access type
access_type = access_type.lower() if access_type is not None else "r"
if access_type == "w":
access = oci.object_storage.models.\
CreatePreauthenticatedRequestDetails.ACCESS_TYPE_OBJECT_WRITE
elif access_type == "rw":
access = oci.object_storage.models.\
CreatePreauthenticatedRequestDetails.\
ACCESS_TYPE_OBJECT_READ_WRITE
else:
access = oci.object_storage.models.\
CreatePreauthenticatedRequestDetails.ACCESS_TYPE_OBJECT_READ
details = oci.object_storage.models.\
CreatePreauthenticatedRequestDetails(
name=f"PAR-{bucket.name}-{bucket_object_name}-R",
object_name=bucket_object_name,
access_type=access,
time_expires=f"{valid_till:%Y-%m-%dT%H:%M:%SZ}")
par = os_client.create_preauthenticated_request(
namespace_name=namespace_name, bucket_name=bucket_name,
create_preauthenticated_request_details=details).data
if return_object:
return par
else:
return get_par_url_prefix(config) + par.access_uri
except oci.exceptions.ServiceError as e:
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
return
except Exception as e:
print(f'ERROR: {e}')
return
def get_par_url_prefix(config=None):
"""Get a preauthenticated request for the given bucket object
Args:
config (object): An OCI config object or None.
Returns:
The PAR URL prefix
"""
# Get the active config and compartment
try:
config = configuration.get_current_config(config=config)
except ValueError as e:
print(f"ERROR: {str(e)}")
return
prefix = f"https://objectstorage.{config.get('region')}.oraclecloud.com"
return prefix
def format_pars_listing(pars):
"""Returns a formatted list of pars.
Args:
pars (list): A list of PAR objects.
Returns:
The formatted list as string
"""
import re
import math
out = ""
i = 1
for p in pars:
# Shorten to 30 chars max, remove linebreaks
name = re.sub(r'[\n\r]', ' ',
p.name[:28] + '..'
if len(p.name) > 30
else p.name)
obj_name = re.sub(r'[\n\r]', ' ',
p.object_name[:28] + '..'
if len(p.object_name) > 30
else p.object_name)
time_cr = f"{p.time_created:%Y-%m-%d %H:%M}" \
if p.time_created is not None else ""
time_ex = f"{p.time_expires:%Y-%m-%d %H:%M}" \
if p.time_expires is not None else ""
out += (f"{i:>4} {name:30} {obj_name:30} {time_cr:16} {time_ex:16}\n")
i += 1
return out
@plugin_function('mds.list.bucketObjectPreauthenticatedRequests')
def list_pars(**kwargs):
"""Lists PAR objects
Args:
**kwargs: Additional options
Keyword Args:
bucket_name (str): The name of the bucket.
name (str): Name of the PAR, can include * to match multiple PARs
compartment_id (str): OCID of the parent compartment.
config (object): An OCI config object or None.
interactive (bool): If set to false exceptions are raised
return_formatted (bool): If set to true, a list object is returned.
Returns:
None
"""
bucket_name = kwargs.get('bucket_name')
name = kwargs.get('name')
compartment_id = kwargs.get('compartment_id')
config = kwargs.get('config')
interactive = kwargs.get('interactive', True)
return_formatted = kwargs.get('return_formatted', True)
# Get the active config and compartment
try:
config = configuration.get_current_config(config=config)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
bucket_name = configuration.get_current_bucket_name(
bucket_name=bucket_name, config=config)
import oci.object_storage
import oci.util
import oci.pagination
import mysqlsh
import re
bucket = get_bucket(
bucket_name=bucket_name, compartment_id=compartment_id, config=config)
if bucket is None:
print("Operation Cancelled.\n")
return
# Initialize the Object Store client
os_client = core.get_oci_object_storage_client(config=config)
# Get Object Store namespace
namespace_name = get_object_store_namespace(config)
# List all PARs
pars = oci.pagination.list_call_get_all_results(
os_client.list_preauthenticated_requests,
namespace_name=namespace_name,
bucket_name=bucket.name,
limit=1000).data
if name:
name = name.lower()
# Filter list if PARs
if '*' in name:
name_pattern = '^' + name.replace('*', '.+')
pars = [par for par in pars
if re.search(name_pattern, par.name.lower())]
else:
pars = [par for par in pars if name == par.name.lower()]
if return_formatted:
return format_pars_listing(pars)
else:
return oci.util.to_dict(pars)
except oci.exceptions.ServiceError as e:
if not interactive:
raise
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
return
except (Exception, ValueError) as e:
if not interactive:
raise
print(f'ERROR: {e}')
return
@plugin_function('mds.get.bucketObjectPreauthenticatedRequest')
def get_par(**kwargs):
"""Gets a PAR object
Args:
**kwargs: Additional options
Keyword Args:
bucket_name (str): The name of the bucket.
name (str): The name of the PAR.
par_id (str): The OCID of the PAR.
compartment_id (str): OCID of the parent compartment.
config (object): An OCI config object or None.
Returns:
None
"""
bucket_name = kwargs.get('bucket_name')
name = kwargs.get('name')
par_id = kwargs.get('par_id')
compartment_id = kwargs.get('compartment_id')
config = kwargs.get('config')
# Get the active config and compartment
try:
config = configuration.get_current_config(config=config)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
bucket_name = configuration.get_current_bucket_name(
bucket_name=bucket_name, config=config)
import oci.object_storage
import mysqlsh
# Initialize the Object Store client
os_client = core.get_oci_object_storage_client(config=config)
# Get Object Store namespace
namespace_name = get_object_store_namespace(config)
# If a par_id is given, look it up directly
if par_id is not None:
par = os_client.get_preauthenticated_request(
namespace_name=namespace_name, bucket_name=bucket_name,
par_id=par_id).data
return par
# If a name was given, look it up
if name:
pars = oci.pagination.list_call_get_all_results(
os_client.list_preauthenticated_requests,
namespace_name=namespace_name,
bucket_name=bucket_name,
limit=1000).data
for par in pars:
if par.name.lower() == name:
return par
bucket = get_bucket(
bucket_name=bucket_name, compartment_id=compartment_id, config=config)
if bucket is None:
print("Operation Cancelled.\n")
return
# List the bucket objects
pars = os_client.list_preauthenticated_requests(
namespace_name=namespace_name, bucket_name=bucket.name).data
# If an user_name was given not given, print the user list
print(f"Preauthenticated Requests:\n")
print(format_pars_listing(pars))
# Let the user choose from the list
par = core.prompt_for_list_item(
item_list=pars,
prompt_caption=("Please enter the index "
"of the preauthenticated request: "))
return par
except oci.exceptions.ServiceError as e:
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
return
except Exception as e:
print(f'ERROR: {e}')
return
@plugin_function('mds.delete.bucketObjectPreauthenticatedRequest')
def delete_par(**kwargs):
"""Deletes an object store PAR
Args:
**kwargs: Additional options
Keyword Args:
bucket_name (str): The name of the bucket.
name (str): Name of the PAR, can include * to match multiple PARs
par_id (str): The OCID of the PAR.
compartment_id (str): OCID of the parent compartment.
config (object): An OCI config object or None.
interactive (bool): Makes the func return true on success
Returns:
None or True
"""
bucket_name = kwargs.get('bucket_name')
name = kwargs.get('name')
par_id = kwargs.get('par_id')
compartment_id = kwargs.get('compartment_id')
config = kwargs.get('config')
interactive = kwargs.get('interactive', True)
# Get the active config and compartment
try:
config = configuration.get_current_config(config=config)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
bucket_name = configuration.get_current_bucket_name(
bucket_name=bucket_name, config=config)
import oci.object_storage
import mysqlsh
import re
bucket = get_bucket(
bucket_name=bucket_name, compartment_id=compartment_id,
config=config)
if bucket is None:
print("Operation Cancelled.\n")
return
# Initialize the Object Store client
os_client = core.get_oci_object_storage_client(config=config)
# Get Object Store namespace
namespace_name = get_object_store_namespace(config)
if not name:
# Get the PAR
par = get_par(
bucket_name=bucket.name, name=name, par_id=par_id,
compartment_id=compartment_id, config=config)
if par is None:
print("Operation cancelled.")
return
os_client.delete_preauthenticated_request(
namespace_name=namespace_name, bucket_name=bucket.name,
par_id=par.id).data
else:
pars = oci.pagination.list_call_get_all_results(
os_client.list_preauthenticated_requests,
namespace_name=namespace_name,
bucket_name=bucket.name,
limit=1000).data
# Filter list if PARs
if name != '*':
name = name.lower()
# Filter list if PARs
if '*' in name:
name_pattern = '^' + name.replace('*', '.+')
pars = [par for par in pars
if re.search(name_pattern, par.name.lower())]
else:
pars = [par for par in pars if name == par.name.lower()]
# Prompt the user for confirmation
if interactive:
prompt = mysqlsh.globals.shell.prompt(
f"Are you sure you want to delete {len(pars)} "
f"preauthenticated request{'s' if len(pars) > 1 else ''} "
f"from {bucket.name} "
f"[yes/NO]: ",
{'defaultValue': 'no'}).strip().lower()
if prompt != "yes":
print("Deletion aborted.\n")
return
for par in pars:
os_client.delete_preauthenticated_request(
namespace_name=namespace_name, bucket_name=bucket_name,
par_id=par.id).data
if interactive:
print("Preauthenticated request deleted successfully.")
else:
return True
except oci.exceptions.ServiceError as e:
if not interactive:
raise
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
return
except (Exception, ValueError) as e:
if not interactive:
raise
print(f'ERROR: {e}')
return