in botocore/crt/auth.py [0:0]
def add_auth(self, request):
if self.credentials is None:
raise NoCredentialsError()
# Use utcnow() because that's what gets mocked by tests, but set
# timezone because CRT assumes naive datetime is local time.
datetime_now = datetime.datetime.utcnow().replace(
tzinfo=datetime.timezone.utc)
# Use existing 'X-Amz-Content-SHA256' header if able
existing_sha256 = self._get_existing_sha256(request)
self._modify_request_before_signing(request)
credentials_provider = awscrt.auth.AwsCredentialsProvider.new_static(
access_key_id=self.credentials.access_key,
secret_access_key=self.credentials.secret_key,
session_token=self.credentials.token)
if self._should_sha256_sign_payload(request):
if existing_sha256:
explicit_payload = existing_sha256
else:
explicit_payload = None # to be calculated during signing
else:
explicit_payload = UNSIGNED_PAYLOAD
if self._should_add_content_sha256_header(explicit_payload):
body_header = \
awscrt.auth.AwsSignedBodyHeaderType.X_AMZ_CONTENT_SHA_256
else:
body_header = awscrt.auth.AwsSignedBodyHeaderType.NONE
signing_config = awscrt.auth.AwsSigningConfig(
algorithm=awscrt.auth.AwsSigningAlgorithm.V4,
signature_type=self._SIGNATURE_TYPE,
credentials_provider=credentials_provider,
region=self._region_name,
service=self._service_name,
date=datetime_now,
should_sign_header=self._should_sign_header,
use_double_uri_encode=self._USE_DOUBLE_URI_ENCODE,
should_normalize_uri_path=self._SHOULD_NORMALIZE_URI_PATH,
signed_body_value=explicit_payload,
signed_body_header_type=body_header,
expiration_in_seconds=self._expiration_in_seconds,
)
crt_request = self._crt_request_from_aws_request(request)
future = awscrt.auth.aws_sign_request(crt_request, signing_config)
future.result()
self._apply_signing_changes(request, crt_request)