in src/aws_encryption_sdk/internal/formatting/deserialize.py [0:0]
def deserialize_frame(stream, header, verifier=None):
"""Deserializes a frame from a body.
:param stream: Source data stream
:type stream: io.BytesIO
:param header: Deserialized header
:type header: aws_encryption_sdk.structures.MessageHeader
:param verifier: Signature verifier object (optional)
:type verifier: aws_encryption_sdk.internal.crypto.Verifier
:returns: Deserialized frame and a boolean stating if this is the final frame
:rtype: :class:`aws_encryption_sdk.internal.structures.MessageFrameBody` and bool
"""
_LOGGER.debug("Starting frame deserialization")
frame_data = {}
final_frame = False
(sequence_number,) = unpack_values(">I", stream, verifier)
if sequence_number == SequenceIdentifier.SEQUENCE_NUMBER_END.value:
_LOGGER.debug("Deserializing final frame")
(sequence_number,) = unpack_values(">I", stream, verifier)
final_frame = True
else:
_LOGGER.debug("Deserializing frame sequence number %d", int(sequence_number))
frame_data["final_frame"] = final_frame
frame_data["sequence_number"] = sequence_number
(frame_iv,) = unpack_values(">{iv_len}s".format(iv_len=header.algorithm.iv_len), stream, verifier)
frame_data["iv"] = frame_iv
if final_frame is True:
(content_length,) = unpack_values(">I", stream, verifier)
if content_length >= header.frame_length:
raise SerializationError(
"Invalid final frame length: {final} >= {normal}".format(
final=content_length, normal=header.frame_length
)
)
else:
content_length = header.frame_length
(frame_content, frame_tag) = unpack_values(
">{content_len}s{auth_len}s".format(content_len=content_length, auth_len=header.algorithm.auth_len),
stream,
verifier,
)
frame_data["ciphertext"] = frame_content
frame_data["tag"] = frame_tag
return MessageFrameBody(**frame_data), final_frame