in alibabacloud_oss_v2/signer/v1.py [0:0]
def _auth_query(self, signing_ctx: SigningContext) -> None:
request = signing_ctx.request
cred = signing_ctx.credentials
#Date
if signing_ctx.signing_time is None:
datetime_now = datetime.datetime.now(datetime.timezone.utc)
else:
datetime_now = signing_ctx.signing_time.astimezone(datetime.timezone.utc)
if signing_ctx.expiration_time is None:
expiration_time = datetime.datetime.now(
datetime.timezone.utc) + datetime.timedelta(minutes=15)
else:
expiration_time = signing_ctx.expiration_time.astimezone(datetime.timezone.utc)
expires = str(int(expiration_time.timestamp()))
encoded_pairs = {}
parts = urlsplit(request.url)
if parts.query:
for pair in parts.query.split('&'):
key, _, value = pair.partition('=')
encoded_pairs[key] = value
encoded_pairs.pop('Signature', None)
encoded_pairs.pop('security-token', None)
encoded_pairs.update(
{
'OSSAccessKeyId': cred.access_key_id,
'Expires': expires,
}
)
if cred.security_token is not None:
encoded_pairs.update({'security-token': quote(cred.security_token, safe='')})
query = []
for key, value in encoded_pairs.items():
if value:
query.append(f'{key}={value}')
else:
query.append(f'{key}')
parts = SplitResult(parts.scheme, parts.netloc, parts.path, '&'.join(query), parts.fragment)
request.url = parts.geturl()
#string to sign
string_to_sign = self._calc_string_to_sign(signing_ctx=signing_ctx, date=expires)
#signature
signature = self._calc_signature(
access_key_secrect=cred.access_key_secret,
string_to_sign=string_to_sign)
request.url = request.url + f'&Signature={quote(signature, safe="")}'
# print('string_to_sign:{}\n'.format(string_to_sign))
signing_ctx.string_to_sign = string_to_sign
signing_ctx.signing_time = datetime_now
signing_ctx.expiration_time = expiration_time