in alibabacloud_oss_v2/signer/v4.py [0:0]
def _calc_canonical_request(self,
signing_ctx: SigningContext,
additional_headers: Optional[Set[str]] = None
) -> str:
"""
Canonical Request
HTTP Verb + "\n" +
Canonical URI + "\n" +
Canonical Query String + "\n" +
Canonical Headers + "\n" +
Additional Headers + "\n" +
Hashed PayLoad
"""
request = signing_ctx.request
# canonical uri
uri = '/'
if signing_ctx.bucket is not None:
uri = uri + signing_ctx.bucket + '/'
if signing_ctx.key is not None:
uri = uri + signing_ctx.key
canonical_uri = quote(uri, safe='/')
# canonical query
canonical_query = ''
parts = urlsplit(request.url)
if parts.query:
key_val_pairs = []
for pair in parts.query.split('&'):
key, _, value = pair.partition('=')
key_val_pairs.append((key, value))
sorted_key_vals = []
for key, value in sorted(key_val_pairs):
if len(value) > 0:
sorted_key_vals.append(f'{key}={value}')
else:
sorted_key_vals.append(f'{key}')
canonical_query = '&'.join(sorted_key_vals)
# canonical headers
canon_headers = []
for k, v in request.headers.items():
lower_key = k.lower()
if _is_sign_header(lower_key, additional_headers):
canon_headers.append((lower_key, v))
canon_headers.sort(key=lambda x: x[0])
canonical_headers = ''.join(
v[0] + ':' + v[1] + '\n' for v in canon_headers)
# canonical additional Headers
canonical_additional_headers = ';'.join(additional_headers)
# hash payload
hash_payload = request.headers.get(
'x-oss-content-sha256', 'UNSIGNED-PAYLOAD')
return f'{request.method}\n{canonical_uri}\n{canonical_query}\n{canonical_headers}\n{canonical_additional_headers}\n{hash_payload}'