in src/aws_encryption_sdk/internal/formatting/deserialize.py [0:0]
def deserialize_encrypted_data_keys(stream, max_encrypted_data_keys=None):
# type: (IO, Union[int, None]) -> Set[EncryptedDataKey]
"""Deserialize some encrypted data keys from a stream.
:param stream: Stream from which to read encrypted data keys
:param max_encrypted_data_keys: Maximum number of encrypted data keys to deserialize
:return: Loaded encrypted data keys
:rtype: set of :class:`EncryptedDataKey`
"""
(encrypted_data_key_count,) = unpack_values(">H", stream)
if max_encrypted_data_keys and encrypted_data_key_count > max_encrypted_data_keys:
raise MaxEncryptedDataKeysExceeded(encrypted_data_key_count, max_encrypted_data_keys)
encrypted_data_keys = set([])
for _ in range(encrypted_data_key_count):
(key_provider_length,) = unpack_values(">H", stream)
(key_provider_identifier,) = unpack_values(">{}s".format(key_provider_length), stream)
(key_provider_information_length,) = unpack_values(">H", stream)
(key_provider_information,) = unpack_values(">{}s".format(key_provider_information_length), stream)
(encrypted_data_key_length,) = unpack_values(">H", stream)
encrypted_data_key = stream.read(encrypted_data_key_length)
# ESDK-Python <4.0.1 incorrectly computed the key provider length for non-ASCII key provider IDs.
# The length in the header was computed as the length of the key provider ID as a string instead of
# the length of the key provider ID as UTF-8 bytes.
# If a non-ASCII key provider ID were supplied, the key provider ID's UTF-8 bytes written to the header
# would be truncated, and attempting to decrypt the message would result in a deserialization error.
# That error would be raised when calling `to_str(key_provider_identifier)` below.
# An impacted message can be decrypted by replacing the truncated provider ID with the expected provider ID
# in decryption code.
# Contact AWS for any questions about this approach.
# ESDK-Python >=4.0.1 corrects the serialization logic and writes the correct length and expected bytes
# to the message header.
encrypted_data_keys.add(
EncryptedDataKey(
key_provider=MasterKeyInfo(
provider_id=to_str(key_provider_identifier), key_info=key_provider_information
),
encrypted_data_key=encrypted_data_key,
)
)
return encrypted_data_keys