in src/aws_encryption_sdk/key_providers/kms.py [0:0]
def _encrypt_data_key(self, data_key, algorithm, encryption_context=None):
"""Encrypts a data key and returns the ciphertext.
:param data_key: Unencrypted data key
:type data_key: :class:`aws_encryption_sdk.structures.RawDataKey`
or :class:`aws_encryption_sdk.structures.DataKey`
:param algorithm: Placeholder to maintain API compatibility with parent
:param dict encryption_context: Encryption context to pass to KMS
:returns: Data key containing encrypted data key
:rtype: aws_encryption_sdk.structures.EncryptedDataKey
:raises EncryptKeyError: if Master Key is unable to encrypt data key
"""
kms_params = self._build_encrypt_request(data_key, encryption_context)
# Catch any boto3 errors and normalize to expected EncryptKeyError
try:
response = self.config.client.encrypt(**kms_params)
# //= compliance/framework/aws-kms/aws-kms-mrk-aware-master-key.txt#2.11
# //# The response's cipher text blob MUST be used as the "ciphertext" for the
# //# encrypted data key.
ciphertext = response["CiphertextBlob"]
key_id = response["KeyId"]
except (ClientError, KeyError):
error_message = "Master Key {key_id} unable to encrypt data key".format(key_id=self._key_id)
_LOGGER.exception(error_message)
raise EncryptKeyError(error_message)
# //= compliance/framework/aws-kms/aws-kms-mrk-aware-master-key.txt#2.11
# //# The AWS KMS Encrypt response MUST contain a valid "KeyId".
# arn_from_str will error if given an invalid key ARN
try:
key_id_str = to_str(key_id)
arn_from_str(key_id_str)
except MalformedArnError:
error_message = "Retrieved an unexpected KeyID in response from KMS: {key_id}".format(key_id=key_id)
_LOGGER.exception(error_message)
raise EncryptKeyError(error_message)
return EncryptedDataKey(
key_provider=MasterKeyInfo(provider_id=self.provider_id, key_info=key_id), encrypted_data_key=ciphertext
)