in src/aws_encryption_sdk/streaming_client.py [0:0]
def _read_header(self):
"""Reads the message header from the input stream.
:returns: tuple containing deserialized header and header_auth objects
:rtype: tuple of aws_encryption_sdk.structures.MessageHeader
and aws_encryption_sdk.internal.structures.MessageHeaderAuthentication
:raises CustomMaximumValueExceeded: if frame length is greater than the custom max value
:raises CustomMaximumValueExceeded: if number of encrypted data keys is greater than the custom max value
"""
header, raw_header = deserialize_header(self.source_stream, self.config.max_encrypted_data_keys)
self.__unframed_bytes_read += len(raw_header)
validate_commitment_policy_on_decrypt(self.config.commitment_policy, header.algorithm)
validate_signature_policy_on_decrypt(self.config.signature_policy, header.algorithm)
if (
self.config.max_body_length is not None
and header.content_type == ContentType.FRAMED_DATA
and header.frame_length > self.config.max_body_length
):
raise CustomMaximumValueExceeded(
"Frame Size in header found larger than custom value: {found:d} > {custom:d}".format(
found=header.frame_length, custom=self.config.max_body_length
)
)
decrypt_materials_request = DecryptionMaterialsRequest(
encrypted_data_keys=header.encrypted_data_keys,
algorithm=header.algorithm,
encryption_context=header.encryption_context,
commitment_policy=self.config.commitment_policy,
)
decryption_materials = self.config.materials_manager.decrypt_materials(request=decrypt_materials_request)
if decryption_materials.verification_key is None:
self.verifier = None
else:
self.verifier = Verifier.from_key_bytes(
algorithm=header.algorithm, key_bytes=decryption_materials.verification_key
)
if self.verifier is not None:
self.verifier.update(raw_header)
header_auth = deserialize_header_auth(
version=header.version, stream=self.source_stream, algorithm=header.algorithm, verifier=self.verifier
)
self._derived_data_key = derive_data_encryption_key(
source_key=decryption_materials.data_key.data_key, algorithm=header.algorithm, message_id=header.message_id
)
if header.algorithm.is_committing():
expected_commitment_key = calculate_commitment_key(
source_key=decryption_materials.data_key.data_key,
algorithm=header.algorithm,
message_id=header.message_id,
)
if not hmac.compare_digest(expected_commitment_key, header.commitment_key):
raise MasterKeyProviderError(
"Key commitment validation failed. Key identity does not match the identity asserted in the "
"message. Halting processing of this message."
)
validate_header(header=header, header_auth=header_auth, raw_header=raw_header, data_key=self._derived_data_key)
return header, header_auth