in util/python/alibabacloud_oss_util/client.py [0:0]
def _signature_v2(req, bucket_name, access_key_secret, additional_headers):
resource = ''
pathname = '/%s%s' % (bucket_name, req.pathname) if bucket_name else req.pathname
sorted_keys = []
if req.query is not None:
sorted_keys = list(req.query)
strs = pathname.split('?')
resource += quote(strs[0], safe='')
if '?' in pathname:
sorted_keys.append(strs[1])
sorted_keys.sort()
if len(sorted_keys) > 0:
resource += '?'
for key in sorted_keys:
if req.query.get(key):
if resource.endswith('?'):
resource += '%s=%s' % (quote(key, safe=''), quote(req.query.get(key), safe=''))
else:
resource += '&%s=%s' % (quote(key, safe=''), quote(req.query.get(key), safe=''))
else:
if resource.endswith('?'):
resource += quote(key, safe='')
else:
resource += '&%s' % quote(key, safe='')
# get sign str
tmp = {}
if additional_headers is None:
headers = []
additional_headers = []
else:
headers = list(additional_headers)
for k, v in req.headers.items():
if v is not None:
if k in headers or k.lower().startswith('x-oss-'):
tmp[k.lower()] = v
sorted_keys = list(tmp)
sorted_keys.sort()
can_oss_headers = ''
for key in sorted_keys:
can_oss_headers += '%s:%s\n' % (key, tmp.get(key))
date = req.headers.get('date', '')
content_type = req.headers.get('content-type', '')
content_md5 = req.headers.get('content-md5', '')
date = date if date is not None else ''
content_type = content_type if content_type is not None else ''
content_md5 = content_md5 if content_md5 is not None else ''
sign_str = '%s\n%s\n%s\n%s\n%s%s\n%s' % (
req.method, content_md5, content_type, date, can_oss_headers, ';'.join(additional_headers), resource)
secret = access_key_secret.encode('utf-8')
sign_str = sign_str.encode('utf-8')
hash_val = hmac.new(secret, sign_str, hashlib.sha256).digest()
signature = base64.b64encode(hash_val).decode('utf-8')
return signature