botocore/crt/auth.py [66:145]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
            signature_type=self._SIGNATURE_TYPE,
            credentials_provider=credentials_provider,
            region=self._region_name,
            service=self._service_name,
            date=datetime_now,
            should_sign_header=self._should_sign_header,
            use_double_uri_encode=self._USE_DOUBLE_URI_ENCODE,
            should_normalize_uri_path=self._SHOULD_NORMALIZE_URI_PATH,
            signed_body_value=explicit_payload,
            signed_body_header_type=body_header,
            expiration_in_seconds=self._expiration_in_seconds,
        )
        crt_request = self._crt_request_from_aws_request(request)
        future = awscrt.auth.aws_sign_request(crt_request, signing_config)
        future.result()
        self._apply_signing_changes(request, crt_request)

    def _crt_request_from_aws_request(self, aws_request):
        url_parts = urlsplit(aws_request.url)
        crt_path = url_parts.path if url_parts.path else '/'
        if aws_request.params:
            array = []
            for (param, value) in aws_request.params.items():
                value = str(value)
                array.append('%s=%s' % (param, value))
            crt_path = crt_path + '?' + '&'.join(array)
        elif url_parts.query:
            crt_path = '%s?%s' % (crt_path, url_parts.query)

        crt_headers = awscrt.http.HttpHeaders(aws_request.headers.items())

        # CRT requires body (if it exists) to be an I/O stream.
        crt_body_stream = None
        if aws_request.body:
            if hasattr(aws_request.body, 'seek'):
                crt_body_stream = aws_request.body
            else:
                crt_body_stream = BytesIO(aws_request.body)

        crt_request = awscrt.http.HttpRequest(
            method=aws_request.method,
            path=crt_path,
            headers=crt_headers,
            body_stream=crt_body_stream)
        return crt_request

    def _apply_signing_changes(self, aws_request, signed_crt_request):
        # Apply changes from signed CRT request to the AWSRequest
        aws_request.headers = HTTPHeaders.from_pairs(
            list(signed_crt_request.headers))

    def _should_sign_header(self, name, **kwargs):
        return name.lower() not in SIGNED_HEADERS_BLACKLIST

    def _modify_request_before_signing(self, request):
        # This could be a retry. Make sure the previous
        # authorization headers are removed first.
        for h in self._PRESIGNED_HEADERS_BLOCKLIST:
            if h in request.headers:
                del request.headers[h]
        # If necessary, add the host header
        if 'host' not in request.headers:
            request.headers['host'] = _host_from_url(request.url)

    def _get_existing_sha256(self, request):
        return request.headers.get('X-Amz-Content-SHA256')

    def _should_sha256_sign_payload(self, request):
        # Payloads will always be signed over insecure connections.
        if not request.url.startswith('https'):
            return True

        # Certain operations may have payload signing disabled by default.
        # Since we don't have access to the operation model, we pass in this
        # bit of metadata through the request context.
        return request.context.get('payload_signing_enabled', True)

    def _should_add_content_sha256_header(self, explicit_payload):
        # only add X-Amz-Content-SHA256 header if payload is explicitly set
        return explicit_payload is not None
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



botocore/crt/auth.py [249:328]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
            signature_type=self._SIGNATURE_TYPE,
            credentials_provider=credentials_provider,
            region=self._region_name,
            service=self._service_name,
            date=datetime_now,
            should_sign_header=self._should_sign_header,
            use_double_uri_encode=self._USE_DOUBLE_URI_ENCODE,
            should_normalize_uri_path=self._SHOULD_NORMALIZE_URI_PATH,
            signed_body_value=explicit_payload,
            signed_body_header_type=body_header,
            expiration_in_seconds=self._expiration_in_seconds,
        )
        crt_request = self._crt_request_from_aws_request(request)
        future = awscrt.auth.aws_sign_request(crt_request, signing_config)
        future.result()
        self._apply_signing_changes(request, crt_request)

    def _crt_request_from_aws_request(self, aws_request):
        url_parts = urlsplit(aws_request.url)
        crt_path = url_parts.path if url_parts.path else '/'
        if aws_request.params:
            array = []
            for (param, value) in aws_request.params.items():
                value = str(value)
                array.append('%s=%s' % (param, value))
            crt_path = crt_path + '?' + '&'.join(array)
        elif url_parts.query:
            crt_path = '%s?%s' % (crt_path, url_parts.query)

        crt_headers = awscrt.http.HttpHeaders(aws_request.headers.items())

        # CRT requires body (if it exists) to be an I/O stream.
        crt_body_stream = None
        if aws_request.body:
            if hasattr(aws_request.body, 'seek'):
                crt_body_stream = aws_request.body
            else:
                crt_body_stream = BytesIO(aws_request.body)

        crt_request = awscrt.http.HttpRequest(
            method=aws_request.method,
            path=crt_path,
            headers=crt_headers,
            body_stream=crt_body_stream)
        return crt_request

    def _apply_signing_changes(self, aws_request, signed_crt_request):
        # Apply changes from signed CRT request to the AWSRequest
        aws_request.headers = HTTPHeaders.from_pairs(
            list(signed_crt_request.headers))

    def _should_sign_header(self, name, **kwargs):
        return name.lower() not in SIGNED_HEADERS_BLACKLIST

    def _modify_request_before_signing(self, request):
        # This could be a retry. Make sure the previous
        # authorization headers are removed first.
        for h in self._PRESIGNED_HEADERS_BLOCKLIST:
            if h in request.headers:
                del request.headers[h]
        # If necessary, add the host header
        if 'host' not in request.headers:
            request.headers['host'] = _host_from_url(request.url)

    def _get_existing_sha256(self, request):
        return request.headers.get('X-Amz-Content-SHA256')

    def _should_sha256_sign_payload(self, request):
        # Payloads will always be signed over insecure connections.
        if not request.url.startswith('https'):
            return True

        # Certain operations may have payload signing disabled by default.
        # Since we don't have access to the operation model, we pass in this
        # bit of metadata through the request context.
        return request.context.get('payload_signing_enabled', True)

    def _should_add_content_sha256_header(self, explicit_payload):
        # only add X-Amz-Content-SHA256 header if payload is explicitly set
        return explicit_payload is not None
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



