in alibabacloud_oss_v2/signer/v4.py [0:0]
def _auth_query(self, signing_ctx: SigningContext) -> None:
request = signing_ctx.request
cred = signing_ctx.credentials
# Date
if signing_ctx.signing_time is None:
datetime_now = datetime.datetime.now(datetime.timezone.utc)
else:
datetime_now = signing_ctx.signing_time.astimezone(
datetime.timezone.utc)
if signing_ctx.expiration_time is None:
expiration_time = datetime.datetime.now(
datetime.timezone.utc) + datetime.timedelta(minutes=15)
else:
expiration_time = signing_ctx.expiration_time.astimezone(
datetime.timezone.utc)
datetime_now_iso8601 = datetime_now.strftime('%Y%m%dT%H%M%SZ')
date_now_iso8601 = datetime_now_iso8601[:8]
expires = int((expiration_time - datetime_now).total_seconds())
# Scope
region = signing_ctx.region or ''
product = signing_ctx.product or ''
scope = self._build_scope(
date=date_now_iso8601, region=region, product=product)
# additional headers
additional_headers = self._common_additional_headers(
request, signing_ctx.additional_headers)
# credentials information
encoded_pairs = {}
parts = urlsplit(request.url)
if parts.query:
for pair in parts.query.split('&'):
key, _, value = pair.partition('=')
encoded_pairs[key] = value
encoded_pairs.pop('x-oss-signature', None)
encoded_pairs.pop('x-oss-security-token', None)
encoded_pairs.pop('x-oss-additional-headers', None)
encoded_pairs.update(
{
'x-oss-signature-version': 'OSS4-HMAC-SHA256',
'x-oss-date': datetime_now_iso8601,
'x-oss-expires': str(expires),
'x-oss-credential': quote(f'{cred.access_key_id}/{scope}', safe='')
}
)
if cred.security_token is not None:
encoded_pairs.update(
{'x-oss-security-token': quote(cred.security_token, safe='')})
if len(additional_headers) > 0:
encoded_pairs.update(
{'x-oss-additional-headers': quote(';'.join(additional_headers), safe='')})
query = []
for key, value in encoded_pairs.items():
if value:
query.append(f'{key}={value}')
else:
query.append(f'{key}')
parts = SplitResult(parts.scheme, parts.netloc,
parts.path, '&'.join(query), parts.fragment)
request.url = parts.geturl()
# print('\nrequest.url:{}'.format(request.url))
# Canonical request
canonical_request = self._calc_canonical_request(
signing_ctx=signing_ctx, additional_headers=additional_headers)
# string to sign
string_to_sign = self._calc_string_to_sign(
datetime_now_iso8601, scope, canonical_request)
# signature
signature = self._calc_signature(
access_key_secrect=cred.access_key_secret,
date=date_now_iso8601,
region=region,
product=product,
string_to_sign=string_to_sign)
request.url = request.url + f'&x-oss-signature={quote(signature, safe="")}'
# print('\ncanonical_request:{}\n'.format(canonical_request))
# print('string_to_sign:{}\n'.format(string_to_sign))
signing_ctx.string_to_sign = string_to_sign
signing_ctx.signing_time = datetime_now
signing_ctx.expiration_time = expiration_time