in cvm-attestation/AttestationClient.py [0:0]
def attest_guest(self):
"""
Attest the Hardware and Guest
"""
# Attest the platform using exponential backoff
max_retries = 5
retries = 0
backoff_factor = 1
while retries < max_retries:
try:
self.log.info('Attesting Guest Evidence...')
hardware_evidence = self.get_hardware_evidence()
hw_report = hardware_evidence.hardware_report # td_quote or snp report
runtime_data = hardware_evidence.runtime_data
report_type = hardware_evidence.type
# get the isolation information for the platform
hw_evidence = ""
imds_client = ImdsClient(self.log)
if report_type == 'tdx':
hw_evidence = TdxEvidence(hw_report, runtime_data)
elif report_type == 'snp':
cert_chain = imds_client.get_vcek_certificate()
hw_evidence = SnpEvidence(hw_report, runtime_data, cert_chain)
else:
self.log.info('Invalid Hardware Report Type')
# Collect guest attestation parameters
os_info = OsInfo()
tss_wrapper = TssWrapper(self.log)
aik_cert = tss_wrapper.get_aik_cert()
aik_pub = tss_wrapper.get_aik_pub()
pcr_quote, sig = tss_wrapper.get_pcr_quote(os_info.pcr_list)
pcr_values = tss_wrapper.get_pcr_values(os_info.pcr_list)
key, key_handle, tpm = tss_wrapper.get_ephemeral_key(os_info.pcr_list)
tpm_info = TpmInfo(aik_cert, aik_pub, pcr_quote, sig, pcr_values, key)
tcg_logs = get_measurements(os_info.type)
isolation = Isolation(self.parameters.isolation_type, hw_evidence)
param = GuestAttestationParameters(os_info, tcg_logs, tpm_info, isolation)
# Calls attestation provider with the guest evidence
request = {
"AttestationInfo": Encoder.base64url_encode_string(param.toJson())
}
encoded_response = self.provider.attest_guest(request)
# Check the response from the server if there is an error
# we retry until all retries have been exhausted
if encoded_response:
self.log.info('Parsing encoded token...')
# decode the response
response = urlsafe_b64decode(encoded_response + '==').decode('utf-8')
response = json.loads(response)
# parse encrypted inner key
encrypted_inner_key = response['EncryptedInnerKey']
encrypted_inner_key = json.dumps(encrypted_inner_key)
encrypted_inner_key_decoded = Encoder.base64decode(encrypted_inner_key)
# parse Encryption Parameters
encryption_params_json = response['EncryptionParams']
iv = json.dumps(encryption_params_json['Iv'])
iv = Encoder.base64decode(iv)
auth_data = response['AuthenticationData']
auth_data = json.dumps(auth_data)
auth_data = Encoder.base64decode(auth_data)
decrypted_inner_key = \
tss_wrapper.decrypt_with_ephemeral_key(
encrypted_inner_key_decoded,
os_info.pcr_list,
key_handle,
tpm
)
# parse the encrypted token
encrypted_jwt = response['Jwt']
encrypted_jwt = json.dumps(encrypted_jwt)
encrypted_jwt = Encoder.base64decode(encrypted_jwt)
# Your AES key
key = decrypted_inner_key
# Create an AESGCM object with the generated key
aesgcm = AESGCM(key)
self.log.info('Decrypting JWT...')
associated_data = bytearray(b'Transport Key')
# NOTE: authentication data is part of the cipher's last 16 bytes
cipher_message = encrypted_jwt + auth_data
# Decrypt the token using the same key, nonce, and associated data
decrypted_data = aesgcm.decrypt(iv, cipher_message, bytes(associated_data))
self.log.info("Decrypted JWT Successfully.")
self.log.info('TOKEN:')
self.log.info(decrypted_data.decode('utf-8'))
encoded_token = decrypted_data.decode('utf-8')
self.provider.print_guest_claims(encoded_token)
return decrypted_data
else:
self.log.error("Token was not received from attestation provider")
retries += 1
if retries < max_retries:
sleep_time = backoff_factor * (2 ** (retries - 1))
self.log.info(f"Retrying in {sleep_time} seconds...")
time.sleep(sleep_time)
else:
self.log.error("Token was not received from attestation provider")
except RequestException as e:
self.log.error(f"Request to attest platform failed with an exception: {e}")
retries += 1
if retries < max_retries:
sleep_time = backoff_factor * (2 ** (retries - 1))
self.log.info(f"Retrying in {sleep_time} seconds...")
time.sleep(sleep_time)
else:
self.log.error(
f"Request failed after all retries have been exhausted. Error: {e}"
)