in cvm-attestation/AttestationClient.py [0:0]
def attest_platform(self):
"""
Attest the Hardware
"""
# Attest the platform using exponential backoff
max_retries = 5
retries = 0
backoff_factor = 1
while retries < max_retries:
try:
self.log.info('Attesting Platform Evidence...')
tss_wrapper = TssWrapper(self.log)
isolation_type = self.parameters.isolation_type
# Extract Hardware Report and Runtime Data
hcl_report = tss_wrapper.get_hcl_report(self.parameters.user_claims)
report_type = ReportParser.extract_report_type(hcl_report)
runtime_data = ReportParser.extract_runtimes_data(hcl_report)
hw_report = ReportParser.extract_hw_report(hcl_report)
# Set request data based on the platform
encoded_report = Encoder.base64url_encode(hw_report)
encoded_runtime_data = Encoder.base64url_encode(runtime_data)
encoded_token = ""
encoded_hw_evidence = ""
imds_client = ImdsClient(self.log)
if report_type == 'tdx' and isolation_type == IsolationType.TDX:
encoded_hw_evidence = imds_client.get_td_quote(encoded_report)
elif report_type == 'snp' and isolation_type == IsolationType.SEV_SNP:
# Logs important SNP fields from the hardware report
self.log_snp_report(hw_report)
cert_chain = imds_client.get_vcek_certificate()
snp_report = {
'SnpReport': encoded_report,
'VcekCertChain': Encoder.base64url_encode(cert_chain)
}
snp_report = json.dumps(snp_report)
snp_report = bytearray(snp_report.encode('utf-8'))
encoded_hw_evidence = Encoder.base64url_encode(snp_report)
else:
self.log.info('Invalid Hardware Report Type')
# verify hardware evidence
encoded_token = self.provider.attest_platform(encoded_hw_evidence, encoded_runtime_data)
# Check the response from the server if there is an error
# we retry until all retries have been exhausted
if encoded_token:
self.log.info('TOKEN:')
self.log.info(encoded_token)
self.provider.print_platform_claims(encoded_token)
return encoded_token
else:
self.log.error("Token was not received from attestation provider")
retries += 1
if retries < max_retries:
sleep_time = backoff_factor * (2 ** (retries - 1))
self.log.info(f"Retrying in {sleep_time} seconds...")
time.sleep(sleep_time)
else:
self.log.error("Token was not received from attestation provider")
except RequestException as e:
self.log.error(f"Request to attest platform failed with an exception: {e}")
retries += 1
if retries < max_retries:
sleep_time = backoff_factor * (2 ** (retries - 1))
self.log.info(f"Retrying in {sleep_time} seconds...")
time.sleep(sleep_time)
else:
self.log.error(
f"Request failed after all retries have been exhausted. Error: {e}"
)