def _deserialize_header_v1()

in src/aws_encryption_sdk/internal/formatting/deserialize.py [0:0]


def _deserialize_header_v1(header, tee_stream, max_encrypted_data_keys):
    # type: (IO, Union[int, None]) -> MessageHeader
    """Deserializes the header from a source stream in SerializationVersion.V1.

    :param header: A dictionary in which to store deserialized values
    :type header: dict
    :param tee_stream: The stream from which to read bytes
    :type tee_stream: aws_encryption_sdk.internal.utils.streams.TeeStream
    :param max_encrypted_data_keys: Maximum number of encrypted keys to deserialize
    :type max_encrypted_data_keys: None or positive int
    :returns: Deserialized MessageHeader object
    :rtype: :class:`aws_encryption_sdk.structures.MessageHeader`
    :raises NotSupportedError: if unsupported data types are found
    :raises UnknownIdentityError: if unknown data types are found
    :raises SerializationError: if IV length does not match algorithm
    """
    _LOGGER.debug("Deserializing header in version V1")

    (message_type_id,) = unpack_values(">B", tee_stream)
    header["type"] = _verified_message_type_from_id(message_type_id)

    algorithm_id, message_id, ser_encryption_context_length = unpack_values(">H16sH", tee_stream)

    header["algorithm"] = _verified_algorithm_from_id(algorithm_id)
    header["message_id"] = message_id

    header["encryption_context"] = deserialize_encryption_context(tee_stream.read(ser_encryption_context_length))

    header["encrypted_data_keys"] = deserialize_encrypted_data_keys(tee_stream, max_encrypted_data_keys)

    (content_type_id,) = unpack_values(">B", tee_stream)
    header["content_type"] = _verified_content_type_from_id(content_type_id)

    (content_aad_length,) = unpack_values(">I", tee_stream)
    header["content_aad_length"] = _verified_content_aad_length(content_aad_length)

    (iv_length,) = unpack_values(">B", tee_stream)
    header["header_iv_length"] = _verified_iv_length(iv_length, header["algorithm"])

    (frame_length,) = unpack_values(">I", tee_stream)
    header["frame_length"] = _verified_frame_length(frame_length, header["content_type"])

    return MessageHeader(**header)