in src/aws_encryption_sdk/streaming_client.py [0:0]
def generate_header(self, message_id):
"""Generates the header object.
:param message_id: The randomly generated id for the message
:type message_id: bytes
"""
version = VERSION
if self._encryption_materials.algorithm.message_format_version == 0x02:
version = SerializationVersion.V2
# If the underlying materials_provider provided required_encryption_context_keys
# (ex. if the materials_provider is a required encryption context CMM),
# then partition the encryption context based on those keys.
if hasattr(self._encryption_materials, "required_encryption_context_keys"):
self._required_encryption_context = {}
self._stored_encryption_context = {}
for (key, value) in self._encryption_materials.encryption_context.items():
if key in self._encryption_materials.required_encryption_context_keys:
self._required_encryption_context[key] = value
else:
self._stored_encryption_context[key] = value
# Otherwise, store all encryption context with the message.
else:
self._stored_encryption_context = self._encryption_materials.encryption_context
self._required_encryption_context = None
kwargs = dict(
version=version,
algorithm=self._encryption_materials.algorithm,
message_id=message_id,
encryption_context=self._stored_encryption_context,
encrypted_data_keys=self._encryption_materials.encrypted_data_keys,
content_type=self.content_type,
frame_length=self.config.frame_length,
)
if self._encryption_materials.algorithm.is_committing():
commitment_key = calculate_commitment_key(
source_key=self._encryption_materials.data_encryption_key.data_key,
algorithm=self._encryption_materials.algorithm,
message_id=message_id,
)
kwargs["commitment_key"] = commitment_key
if version == SerializationVersion.V1:
kwargs["type"] = TYPE
kwargs["content_aad_length"] = 0
kwargs["header_iv_length"] = self._encryption_materials.algorithm.iv_len
return MessageHeader(**kwargs)