def _auth_query()

in alibabacloud_oss_v2/signer/v4.py [0:0]


    def _auth_query(self, signing_ctx: SigningContext) -> None:
        request = signing_ctx.request
        cred = signing_ctx.credentials

        # Date
        if signing_ctx.signing_time is None:
            datetime_now = datetime.datetime.now(datetime.timezone.utc)
        else:
            datetime_now = signing_ctx.signing_time.astimezone(
                datetime.timezone.utc)

        if signing_ctx.expiration_time is None:
            expiration_time = datetime.datetime.now(
                datetime.timezone.utc) + datetime.timedelta(minutes=15)
        else:
            expiration_time = signing_ctx.expiration_time.astimezone(
                datetime.timezone.utc)

        datetime_now_iso8601 = datetime_now.strftime('%Y%m%dT%H%M%SZ')
        date_now_iso8601 = datetime_now_iso8601[:8]
        expires = int((expiration_time - datetime_now).total_seconds())

        # Scope
        region = signing_ctx.region or ''
        product = signing_ctx.product or ''
        scope = self._build_scope(
            date=date_now_iso8601, region=region, product=product)

        # additional headers
        additional_headers = self._common_additional_headers(
            request, signing_ctx.additional_headers)

        # credentials information
        encoded_pairs = {}
        parts = urlsplit(request.url)
        if parts.query:
            for pair in parts.query.split('&'):
                key, _, value = pair.partition('=')
                encoded_pairs[key] = value

        encoded_pairs.pop('x-oss-signature', None)
        encoded_pairs.pop('x-oss-security-token', None)
        encoded_pairs.pop('x-oss-additional-headers', None)
        encoded_pairs.update(
            {
                'x-oss-signature-version': 'OSS4-HMAC-SHA256',
                'x-oss-date': datetime_now_iso8601,
                'x-oss-expires': str(expires),
                'x-oss-credential': quote(f'{cred.access_key_id}/{scope}', safe='')
            }
        )
        if cred.security_token is not None:
            encoded_pairs.update(
                {'x-oss-security-token': quote(cred.security_token, safe='')})

        if len(additional_headers) > 0:
            encoded_pairs.update(
                {'x-oss-additional-headers': quote(';'.join(additional_headers), safe='')})

        query = []
        for key, value in encoded_pairs.items():
            if value:
                query.append(f'{key}={value}')
            else:
                query.append(f'{key}')

        parts = SplitResult(parts.scheme, parts.netloc,
                            parts.path, '&'.join(query), parts.fragment)
        request.url = parts.geturl()

        # print('\nrequest.url:{}'.format(request.url))

        # Canonical request
        canonical_request = self._calc_canonical_request(
            signing_ctx=signing_ctx, additional_headers=additional_headers)

        # string to sign
        string_to_sign = self._calc_string_to_sign(
            datetime_now_iso8601, scope, canonical_request)

        # signature
        signature = self._calc_signature(
            access_key_secrect=cred.access_key_secret,
            date=date_now_iso8601,
            region=region,
            product=product,
            string_to_sign=string_to_sign)

        request.url = request.url + f'&x-oss-signature={quote(signature, safe="")}'

        # print('\ncanonical_request:{}\n'.format(canonical_request))
        # print('string_to_sign:{}\n'.format(string_to_sign))
        signing_ctx.string_to_sign = string_to_sign
        signing_ctx.signing_time = datetime_now
        signing_ctx.expiration_time = expiration_time