in src/aws_encryption_sdk/internal/formatting/deserialize.py [0:0]
def _deserialize_header_v1(header, tee_stream, max_encrypted_data_keys):
# type: (IO, Union[int, None]) -> MessageHeader
"""Deserializes the header from a source stream in SerializationVersion.V1.
:param header: A dictionary in which to store deserialized values
:type header: dict
:param tee_stream: The stream from which to read bytes
:type tee_stream: aws_encryption_sdk.internal.utils.streams.TeeStream
:param max_encrypted_data_keys: Maximum number of encrypted keys to deserialize
:type max_encrypted_data_keys: None or positive int
:returns: Deserialized MessageHeader object
:rtype: :class:`aws_encryption_sdk.structures.MessageHeader`
:raises NotSupportedError: if unsupported data types are found
:raises UnknownIdentityError: if unknown data types are found
:raises SerializationError: if IV length does not match algorithm
"""
_LOGGER.debug("Deserializing header in version V1")
(message_type_id,) = unpack_values(">B", tee_stream)
header["type"] = _verified_message_type_from_id(message_type_id)
algorithm_id, message_id, ser_encryption_context_length = unpack_values(">H16sH", tee_stream)
header["algorithm"] = _verified_algorithm_from_id(algorithm_id)
header["message_id"] = message_id
header["encryption_context"] = deserialize_encryption_context(tee_stream.read(ser_encryption_context_length))
header["encrypted_data_keys"] = deserialize_encrypted_data_keys(tee_stream, max_encrypted_data_keys)
(content_type_id,) = unpack_values(">B", tee_stream)
header["content_type"] = _verified_content_type_from_id(content_type_id)
(content_aad_length,) = unpack_values(">I", tee_stream)
header["content_aad_length"] = _verified_content_aad_length(content_aad_length)
(iv_length,) = unpack_values(">B", tee_stream)
header["header_iv_length"] = _verified_iv_length(iv_length, header["algorithm"])
(frame_length,) = unpack_values(">I", tee_stream)
header["frame_length"] = _verified_frame_length(frame_length, header["content_type"])
return MessageHeader(**header)