def get_ephemeral_key()

in cvm-attestation/tpm_wrapper.py [0:0]


  def get_ephemeral_key(self, pcr_list):
    tpm = Tpm()
    tpm.connect()

    pcr_select = self.get_pcr_select(pcr_list)

    pcrs = self.get_pcr_values(pcr_list)

    attributes = (
      TPMA_OBJECT.decrypt |
      TPMA_OBJECT.fixedTPM |
      TPMA_OBJECT.fixedParent |
      TPMA_OBJECT.sensitiveDataOrigin |
      TPMA_OBJECT.noDA
    )
    parameters = TPMS_RSA_PARMS(
      TPMT_SYM_DEF_OBJECT(),
      TPMS_NULL_ASYM_SCHEME(),
      2048,
      0
    )
    in_public = TPMT_PUBLIC(
      TPM_ALG_ID.SHA256, attributes,
      None,
      parameters,
      TPM2B_PUBLIC_KEY_RSA()
    )

    sign = TPM_HANDLE(int(AIK_PUB_INDEX, 16) + 3)

    # Start a policy session to be used with ActivateCredential()
    nonceCaller = crypto.randomBytes(20)
    respSas = tpm.StartAuthSession(None, None, nonceCaller, None, TPM_SE.TRIAL, NullSymDef, TPM_ALG_ID.SHA256)
    hSess = respSas.handle
    self.log.info('DRS >> StartAuthSession(POLICY_SESS) returned ' + str(tpm.lastResponseCode) + '; sess handle: ' + str(hSess.handle))
    sess = Session(hSess, respSas.nonceTPM)

    # Retrieve the policy digest computed by the TPM
    pcr_digest = self.sha256_hash_update(pcrs)
    resp = tpm.PolicyPCR(hSess, bytes.fromhex(pcr_digest), pcr_select)
    dupPolicyDigest = tpm.PolicyGetDigest(hSess)
    in_public.authPolicy = dupPolicyDigest
    self.log.info('DRS >> PolicyGetDigest() returned ' + str(tpm.lastResponseCode))

    # Create RSA Key
    idKey = tpm.withSession(NullPwSession)  \
                .CreatePrimary(Owner, TPMS_SENSITIVE_CREATE(), in_public, None, pcr_select)
    self.log.info('DRS >> CreatePrimary(idKey) returned ' + str(tpm.lastResponseCode))

    encryption_key = idKey.outPublic.asTpm2B()
    self.log.info('CreatePrimary returned ' + str(tpm.lastResponseCode))
    if (not idKey.getHandle()):
        raise(Exception("CreatePrimary failed for " + in_public))
  

    response = tpm.Certify(idKey.getHandle(), sign, 0, TPMS_NULL_ASYM_SCHEME())
    buf = TpmBuffer(response.certifyInfo.asTpm2B()).createObj(TPM2B_ATTEST)
    self.log.info(buf.attestationData.attested)
    certify_info = response.certifyInfo.toBytes()
    signature = response.signature.sig

    ephemeral_Key = EphemeralKey(encryption_key, certify_info, signature)

    self.cleanSlots(tpm, TPM_HT.LOADED_SESSION)

    # not closing TPM connection since we need the key handle
    return ephemeral_Key, idKey.getHandle(), tpm