def attest_guest()

in cvm-attestation/AttestationClient.py [0:0]


  def attest_guest(self):
    """
    Attest the Hardware and Guest
    """

    # Attest the platform using exponential backoff
    max_retries = 5
    retries = 0
    backoff_factor = 1
    while retries < max_retries:
      try:
        self.log.info('Attesting Guest Evidence...')

        hardware_evidence = self.get_hardware_evidence()
        hw_report = hardware_evidence.hardware_report   # td_quote or snp report
        runtime_data = hardware_evidence.runtime_data
        report_type = hardware_evidence.type

        # get the isolation information for the platform
        hw_evidence = ""
        imds_client = ImdsClient(self.log)
        if report_type == 'tdx':
          hw_evidence = TdxEvidence(hw_report, runtime_data)
        elif report_type == 'snp':
          cert_chain = imds_client.get_vcek_certificate()
          hw_evidence = SnpEvidence(hw_report, runtime_data, cert_chain)
        else:
          self.log.info('Invalid Hardware Report Type')


        # Collect guest attestation parameters
        os_info = OsInfo()
        tss_wrapper = TssWrapper(self.log)
        aik_cert = tss_wrapper.get_aik_cert()
        aik_pub = tss_wrapper.get_aik_pub()
        pcr_quote, sig = tss_wrapper.get_pcr_quote(os_info.pcr_list)
        pcr_values = tss_wrapper.get_pcr_values(os_info.pcr_list)
        key, key_handle, tpm = tss_wrapper.get_ephemeral_key(os_info.pcr_list)
        tpm_info = TpmInfo(aik_cert, aik_pub, pcr_quote, sig, pcr_values, key)
        tcg_logs = get_measurements(os_info.type)
        isolation = Isolation(self.parameters.isolation_type, hw_evidence)
        param = GuestAttestationParameters(os_info, tcg_logs, tpm_info, isolation)

        # Calls attestation provider with the guest evidence
        request = {
          "AttestationInfo": Encoder.base64url_encode_string(param.toJson())
        }
        encoded_response = self.provider.attest_guest(request)

        # Check the response from the server if there is an error
        # we retry until all retries have been exhausted
        if encoded_response:
          self.log.info('Parsing encoded token...')

          # decode the response
          response = urlsafe_b64decode(encoded_response + '==').decode('utf-8')
          response = json.loads(response)

          # parse encrypted inner key
          encrypted_inner_key = response['EncryptedInnerKey']
          encrypted_inner_key = json.dumps(encrypted_inner_key)
          encrypted_inner_key_decoded = Encoder.base64decode(encrypted_inner_key)

          # parse Encryption Parameters
          encryption_params_json = response['EncryptionParams']
          iv = json.dumps(encryption_params_json['Iv'])
          iv = Encoder.base64decode(iv)

          auth_data = response['AuthenticationData']
          auth_data = json.dumps(auth_data)
          auth_data = Encoder.base64decode(auth_data)

          decrypted_inner_key = \
            tss_wrapper.decrypt_with_ephemeral_key(
              encrypted_inner_key_decoded,
              os_info.pcr_list,
              key_handle,
              tpm
            )

          # parse the encrypted token
          encrypted_jwt = response['Jwt']
          encrypted_jwt = json.dumps(encrypted_jwt)
          encrypted_jwt = Encoder.base64decode(encrypted_jwt)

          # Your AES key
          key = decrypted_inner_key

          # Create an AESGCM object with the generated key
          aesgcm = AESGCM(key)

          self.log.info('Decrypting JWT...')

          associated_data = bytearray(b'Transport Key')

          # NOTE: authentication data is part of the cipher's last 16 bytes
          cipher_message = encrypted_jwt + auth_data

          # Decrypt the token using the same key, nonce, and associated data
          decrypted_data = aesgcm.decrypt(iv, cipher_message, bytes(associated_data))
          self.log.info("Decrypted JWT Successfully.")
          self.log.info('TOKEN:')
          self.log.info(decrypted_data.decode('utf-8'))

          encoded_token = decrypted_data.decode('utf-8')
          self.provider.print_guest_claims(encoded_token)

          return decrypted_data
        else:
          self.log.error("Token was not received from attestation provider")

          retries += 1
          if retries < max_retries:
            sleep_time = backoff_factor * (2 ** (retries - 1))
            self.log.info(f"Retrying in {sleep_time} seconds...")
            time.sleep(sleep_time)
          else:
            self.log.error("Token was not received from attestation provider")
      except RequestException as e:
        self.log.error(f"Request to attest platform failed with an exception: {e}")

        retries += 1
        if retries < max_retries:
          sleep_time = backoff_factor * (2 ** (retries - 1))
          self.log.info(f"Retrying in {sleep_time} seconds...")
          time.sleep(sleep_time)
        else:
          self.log.error(
            f"Request failed after all retries have been exhausted. Error: {e}"
          )