in src/aws_encryption_sdk/streaming_client.py [0:0]
def _read_header(self):
"""Reads the message header from the input stream.
:returns: tuple containing deserialized header and header_auth objects
:rtype: tuple of aws_encryption_sdk.structures.MessageHeader
and aws_encryption_sdk.internal.structures.MessageHeaderAuthentication
:raises CustomMaximumValueExceeded: if frame length is greater than the custom max value
:raises CustomMaximumValueExceeded: if number of encrypted data keys is greater than the custom max value
"""
header, raw_header = deserialize_header(self.source_stream, self.config.max_encrypted_data_keys)
self.__unframed_bytes_read += len(raw_header)
validate_commitment_policy_on_decrypt(self.config.commitment_policy, header.algorithm)
validate_signature_policy_on_decrypt(self.config.signature_policy, header.algorithm)
if (
self.config.max_body_length is not None
and header.content_type == ContentType.FRAMED_DATA
and header.frame_length > self.config.max_body_length
):
raise CustomMaximumValueExceeded(
"Frame Size in header found larger than custom value: {found:d} > {custom:d}".format(
found=header.frame_length, custom=self.config.max_body_length
)
)
decrypt_materials_request = self._create_decrypt_materials_request(header)
decryption_materials = self.config.materials_manager.decrypt_materials(request=decrypt_materials_request)
# If the materials_manager passed required_encryption_context_keys,
# get the items out of the encryption_context with the keys.
# The items are used in header validation.
if hasattr(decryption_materials, "required_encryption_context_keys"):
self._required_encryption_context = {}
for (key, value) in decryption_materials.encryption_context.items():
if key in decryption_materials.required_encryption_context_keys:
self._required_encryption_context[key] = value
else:
self._required_encryption_context = None
if decryption_materials.verification_key is None:
self.verifier = None
else:
# MPL verification key is NOT key bytes; it is bytes of the compressed point.
# If the underlying CMM is from the MPL, load bytes from encoded point.
if (_HAS_MPL
and isinstance(self.config.materials_manager, CryptoMaterialsManagerFromMPL)):
self.verifier = Verifier.from_encoded_point(
algorithm=header.algorithm,
encoded_point=base64.b64encode(decryption_materials.verification_key)
)
else:
self.verifier = Verifier.from_key_bytes(
algorithm=header.algorithm, key_bytes=decryption_materials.verification_key
)
if self.verifier is not None:
self.verifier.update(raw_header)
header_auth = deserialize_header_auth(
version=header.version, stream=self.source_stream, algorithm=header.algorithm, verifier=self.verifier
)
self._derived_data_key = derive_data_encryption_key(
source_key=decryption_materials.data_key.data_key, algorithm=header.algorithm, message_id=header.message_id
)
if header.algorithm.is_committing():
expected_commitment_key = calculate_commitment_key(
source_key=decryption_materials.data_key.data_key,
algorithm=header.algorithm,
message_id=header.message_id,
)
if not hmac.compare_digest(expected_commitment_key, header.commitment_key):
raise MasterKeyProviderError(
"Key commitment validation failed. Key identity does not match the identity asserted in the "
"message. Halting processing of this message."
)
return self._validate_parsed_header(
header=header,
header_auth=header_auth,
raw_header=raw_header,
)