in src/aws_encryption_sdk/internal/formatting/serialize.py [0:0]
def _serialize_header_v2(header, signer=None):
"""Serializes a header object for messages with SerializationVersion.V2.
:param header: Header to serialize
:type header: aws_encryption_sdk.structures.MessageHeader
:param signer: Cryptographic signer object (optional)
:type signer: aws_encryption_sdk.internal.crypto.Signer
:returns: Serialized header
:rtype: bytes
"""
ec_serialized = aws_encryption_sdk.internal.formatting.encryption_context.serialize_encryption_context(
header.encryption_context
)
header_start_format = (
">" # big endian
"B" # version
"H" # algorithm ID
"32s" # message ID
"H" # encryption context length
"{}s" # serialized encryption context
).format(len(ec_serialized))
header_bytes = bytearray()
header_bytes.extend(
struct.pack(
header_start_format,
header.version.value,
header.algorithm.algorithm_id,
header.message_id,
len(ec_serialized),
ec_serialized,
)
)
serialized_data_keys = bytearray()
for data_key in header.encrypted_data_keys:
serialized_data_keys.extend(serialize_encrypted_data_key(data_key))
header_bytes.extend(struct.pack(">H", len(header.encrypted_data_keys)))
header_bytes.extend(serialized_data_keys)
header_bytes.extend(struct.pack(">B", header.content_type.value))
header_bytes.extend(struct.pack(">I", header.frame_length))
if header.algorithm.is_committing():
algorithm_suite_data_length = header.algorithm.algorithm_suite_data_length()
header_bytes.extend(struct.pack(">{}s".format(algorithm_suite_data_length), header.commitment_key))
output = bytes(header_bytes)
if signer is not None:
signer.update(output)
return output