def _calc_canonical_request()

in alibabacloud_oss_v2/signer/v4.py [0:0]


    def _calc_canonical_request(self,
                                signing_ctx: SigningContext,
                                additional_headers: Optional[Set[str]] = None
                                ) -> str:
        """
        Canonical Request
            HTTP Verb + "\n" +
            Canonical URI + "\n" +
            Canonical Query String + "\n" +
            Canonical Headers + "\n" +
            Additional Headers + "\n" +
            Hashed PayLoad
        """
        request = signing_ctx.request

        # canonical uri
        uri = '/'
        if signing_ctx.bucket is not None:
            uri = uri + signing_ctx.bucket + '/'
        if signing_ctx.key is not None:
            uri = uri + signing_ctx.key
        canonical_uri = quote(uri, safe='/')

        # canonical query
        canonical_query = ''
        parts = urlsplit(request.url)
        if parts.query:
            key_val_pairs = []
            for pair in parts.query.split('&'):
                key, _, value = pair.partition('=')
                key_val_pairs.append((key, value))
            sorted_key_vals = []
            for key, value in sorted(key_val_pairs):
                if len(value) > 0:
                    sorted_key_vals.append(f'{key}={value}')
                else:
                    sorted_key_vals.append(f'{key}')
            canonical_query = '&'.join(sorted_key_vals)

        # canonical headers
        canon_headers = []
        for k, v in request.headers.items():
            lower_key = k.lower()
            if _is_sign_header(lower_key, additional_headers):
                canon_headers.append((lower_key, v))
        canon_headers.sort(key=lambda x: x[0])
        canonical_headers = ''.join(
            v[0] + ':' + v[1] + '\n' for v in canon_headers)

        # canonical additional Headers
        canonical_additional_headers = ';'.join(additional_headers)

        # hash payload
        hash_payload = request.headers.get(
            'x-oss-content-sha256', 'UNSIGNED-PAYLOAD')

        return f'{request.method}\n{canonical_uri}\n{canonical_query}\n{canonical_headers}\n{canonical_additional_headers}\n{hash_payload}'