def _generate_data_key()

in src/aws_encryption_sdk/key_providers/kms.py [0:0]


    def _generate_data_key(self, algorithm, encryption_context=None):
        """Generates data key and returns plaintext and ciphertext of key.

        :param algorithm: Algorithm on which to base data key
        :type algorithm: aws_encryption_sdk.identifiers.Algorithm
        :param dict encryption_context: Encryption context to pass to KMS
        :returns: Generated data key
        :rtype: aws_encryption_sdk.structures.DataKey
        """
        kms_params = self._build_generate_data_key_request(algorithm, encryption_context)
        # Catch any boto3 errors and normalize to expected EncryptKeyError
        try:
            response = self.config.client.generate_data_key(**kms_params)
            # //= compliance/framework/aws-kms/aws-kms-mrk-aware-master-key.txt#2.10
            # //# The response's "Plaintext" MUST be the plaintext in the output.
            plaintext = response["Plaintext"]
            # //= compliance/framework/aws-kms/aws-kms-mrk-aware-master-key.txt#2.10
            # //# The response's cipher text blob MUST be used as the returned as the
            # //# ciphertext for the encrypted data key in the output.
            ciphertext = response["CiphertextBlob"]
            key_id = response["KeyId"]
        except (ClientError, KeyError):
            error_message = "Master Key {key_id} unable to generate data key".format(key_id=self._key_id)
            _LOGGER.exception(error_message)
            raise GenerateKeyError(error_message)

        # //= compliance/framework/aws-kms/aws-kms-mrk-aware-master-key.txt#2.10
        # //# The response's "KeyId" MUST be valid.
        # arn_from_str will error if given an invalid key ARN
        try:
            key_id_str = to_str(key_id)
            arn_from_str(key_id_str)
        except MalformedArnError:
            error_message = "Retrieved an unexpected KeyID in response from KMS: {key_id}".format(key_id=key_id)
            _LOGGER.exception(error_message)
            raise GenerateKeyError(error_message)

        return DataKey(
            key_provider=MasterKeyInfo(provider_id=self.provider_id, key_info=key_id),
            data_key=plaintext,
            encrypted_data_key=ciphertext,
        )