iact3/plugin/oss.py (78 lines of code) (raw):
import base64
import json
import oss2
from alibabacloud_credentials import credentials
from retrying import retry
from iact3.plugin.base_plugin import CredentialClient
def retry_on_exception(exception):
return isinstance(exception, oss2.exceptions.RequestError)
class OssPlugin:
def __init__(self, region_id: str,
bucket_name: str,
endpoint: str = None,
credential: CredentialClient = None,
**kwargs):
self.region_id = region_id
self.auth = self._get_auth(credential)
if not endpoint:
endpoint = f'https://oss-{self.region_id}.aliyuncs.com'
self.endpoint = endpoint
self.client = oss2.Bucket(
self.auth, self.endpoint, bucket_name,
app_name='iact3',
connect_timeout=30,
**kwargs)
def _get_auth(self, cred: CredentialClient = None):
cred_client = CredentialClient() if cred is None else cred
credential = cred_client.cloud_credential
if isinstance(credential, credentials.AccessKeyCredential):
auth = oss2.Auth(
credential.access_key_id,
credential.access_key_secret
)
elif isinstance(credential, credentials.StsCredential):
auth = oss2.StsAuth(
credential.access_key_id,
credential.access_key_secret,
credential.security_token
)
else:
auth = oss2.AnonymousAuth()
return auth
@staticmethod
def _encode_callback(callback_params):
cb_str = json.dumps(callback_params).strip()
return oss2.compat.to_string(base64.b64encode(oss2.compat.to_bytes(cb_str)))
@retry(retry_on_exception=retry_on_exception, stop_max_attempt_number=3, wait_fixed=5)
def put_object_with_string(self, object_name: str, strings: str,
callback_params: dict = None, callback_var_params: dict = None):
params = {}
if callback_params:
params['x-oss-callback'] = self._encode_callback(callback_params)
if callback_var_params:
params['x-oss-callback-var'] = self._encode_callback(callback_var_params)
if params:
self.client.put_object(object_name, strings, params)
else:
self.client.put_object(object_name, strings)
@retry(retry_on_exception=retry_on_exception, stop_max_attempt_number=3, wait_fixed=5)
def put_local_file(self, object_name: str, local_file: str):
self.client.put_object_from_file(object_name, local_file)
@retry(retry_on_exception=retry_on_exception, stop_max_attempt_number=3, wait_fixed=5)
def object_exists(self, object_name: str):
return self.client.object_exists(object_name)
@retry(retry_on_exception=retry_on_exception, stop_max_attempt_number=3, wait_fixed=5)
def get_object_content(self, object_name: str):
return self.client.get_object(object_name)
@retry(retry_on_exception=retry_on_exception, stop_max_attempt_number=3, wait_fixed=5)
def get_object_meta(self, object_name: str):
return self.client.get_object_meta(object_name)
@retry(retry_on_exception=retry_on_exception, stop_max_attempt_number=3, wait_fixed=5)
def bucket_exist(self):
try:
self.client.get_bucket_info()
except Exception as ex:
if isinstance(ex, oss2.exceptions.NoSuchBucket):
return False
raise
return True