def _decrypt_data_key()

in src/aws_encryption_sdk/key_providers/kms.py [0:0]


    def _decrypt_data_key(self, encrypted_data_key, algorithm, encryption_context=None):
        """Decrypts an encrypted data key and returns the plaintext.

        :param data_key: Encrypted data key
        :type data_key: aws_encryption_sdk.structures.EncryptedDataKey
        :type algorithm: `aws_encryption_sdk.identifiers.Algorithm` (not used for KMS)
        :param dict encryption_context: Encryption context to use in decryption
        :returns: Decrypted data key
        :rtype: aws_encryption_sdk.structures.DataKey
        :raises DecryptKeyError: if Master Key is unable to decrypt data key
        """
        # //= compliance/framework/aws-kms/aws-kms-mrk-aware-master-key.txt#2.9
        # //# Additionally each provider info MUST be a valid AWS KMS ARN
        # //# (aws-kms-key-arn.md#a-valid-aws-kms-arn) with a resource type of
        # //# "key".
        edk_key_id = to_str(encrypted_data_key.key_provider.key_info)
        edk_arn = arn_from_str(edk_key_id)
        if not edk_arn.resource_type == "key":
            error_message = "AWS KMS Provider EDK contains unexpected key_id: {key_id}".format(key_id=edk_key_id)
            _LOGGER.exception(error_message)
            raise DecryptKeyError(error_message)

        self._validate_allowed_to_decrypt(edk_key_id)
        kms_params = self._build_decrypt_request(encrypted_data_key, encryption_context)
        # Catch any boto3 errors and normalize to expected DecryptKeyError
        try:
            response = self.config.client.decrypt(**kms_params)

            # //= compliance/framework/aws-kms/aws-kms-mrk-aware-master-key.txt#2.9
            # //# If the call succeeds then the response's "KeyId" MUST be equal to the
            # //# configured AWS KMS key identifier otherwise the function MUST collect
            # //# an error.
            # Note that Python logs but does not collect errors
            returned_key_id = response["KeyId"]
            if returned_key_id != self._key_id:
                error_message = "AWS KMS returned unexpected key_id {returned} (expected {key_id})".format(
                    returned=returned_key_id, key_id=self._key_id
                )
                _LOGGER.exception(error_message)
                raise DecryptKeyError(error_message)

            # //= compliance/framework/aws-kms/aws-kms-mrk-aware-master-key.txt#2.9
            # //# The response's "Plaintext"'s length MUST equal the length
            # //# required by the requested algorithm suite otherwise the function MUST
            # //# collect an error.
            # Note that Python logs but does not collect errors
            plaintext = response["Plaintext"]
            if len(plaintext) != algorithm.data_key_len:
                error_message = "Plaintext length ({len1}) does not match algorithm's expected length ({len2})".format(
                    len1=len(plaintext), len2=algorithm.data_key_len
                )
                raise DecryptKeyError(error_message)

        except (ClientError, KeyError):
            error_message = "Master Key {key_id} unable to decrypt data key".format(key_id=self._key_id)
            _LOGGER.exception(error_message)
            raise DecryptKeyError(error_message)
        return DataKey(
            key_provider=self.key_provider, data_key=plaintext, encrypted_data_key=encrypted_data_key.encrypted_data_key
        )