in alibabacloud_oss_v2/signer/v1.py [0:0]
def _calc_string_to_sign(self, signing_ctx: SigningContext, date: Optional[str] = None) -> str:
"""
Canonical Request
HTTP Verb + "\n" +
Content-MD5 + "\n" +
Content-Type + "\n" +
Date + "\n" +
CanonicalizedOSSHeaders + "\n" +
CanonicalizedResource
"""
request = signing_ctx.request
# canonical uri
uri = '/'
if signing_ctx.bucket is not None:
uri = uri + signing_ctx.bucket + '/'
if signing_ctx.key is not None:
uri = uri + signing_ctx.key
canonical_uri = uri
# canonical query
canonical_query = ''
parts = urlsplit(request.url)
if parts.query:
key_val_pairs = []
for pair in parts.query.split('&'):
key, _, value = pair.partition('=')
key = unquote(key)
value = unquote(value)
if key in self._subresource_key_set:
key_val_pairs.append((key, value))
elif key in signing_ctx.sub_resource:
key_val_pairs.append((key, value))
sorted_key_vals = []
for key, value in sorted(key_val_pairs):
if len(value) > 0:
sorted_key_vals.append(f'{key}={value}')
else:
sorted_key_vals.append(f'{key}')
if key_val_pairs:
canonical_query = '?' + '&'.join(sorted_key_vals)
else:
canonical_query = ''
canonical_resource = canonical_uri + canonical_query
#canonical headers
canon_headers = []
for k, v in request.headers.items():
lower_key = k.lower()
if _is_sign_header(lower_key):
canon_headers.append((lower_key, v))
canon_headers.sort(key=lambda x: x[0])
canonical_headers = ''.join(v[0] + ':' + v[1] + '\n' for v in canon_headers)
content_md5 = request.headers.get('content-md5', '')
content_type = request.headers.get('content-type', '')
if date is None:
date = request.headers.get('x-oss-date', '') or request.headers.get('date', '')
return '{}\n{}\n{}\n{}\n{}'.format(
request.method,
content_md5,
content_type,
date,
canonical_headers + canonical_resource)