in src/local_gpu_verifier/src/verifier/cc_admin_utils.py [0:0]
def ocsp_certificate_chain_validation(cert_chain, settings, mode):
""" A static method to perform the ocsp status check of the input certificate chain along with the
signature verification and the cert chain verification if the ocsp response message received.
Args:
cert_chain (list): the list of the input certificates of the certificate chain.
settings (config.HopperSettings): the object containing the various config info.
mode (<enum 'CERT CHAIN VERIFICATION MODE'>): Used to determine if the certificate chain
verification is for the GPU attestation certificate chain or RIM certificate chain
or the ocsp response certificate chain.
Returns:
[Bool]: True if the ocsp status of all the appropriate certificates in the
certificate chain, otherwise False.
"""
assert isinstance(cert_chain, list)
revoked_status = False
start_index = 0
gpu_attestation_warning_msg_list = []
if mode == BaseSettings.Certificate_Chain_Verification_Mode.GPU_ATTESTATION:
start_index = 1
end_index = len(cert_chain) - 1
for i, cert in enumerate(cert_chain):
cert_chain[i] = cert.to_cryptography()
for i in range(start_index, end_index):
cert_common_name = cert_chain[i].subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)[0].value
# Fetch OCSP Response from provided OCSP Service
nonce = (
CcAdminUtils.generate_nonce(BaseSettings.SIZE_OF_NONCE_IN_BYTES)
if BaseSettings.OCSP_NONCE_ENABLED
else None
)
ocsp_request = CcAdminUtils.build_ocsp_request(cert_chain[i], cert_chain[i + 1], nonce)
try:
ocsp_response = function_wrapper_with_timeout(
[
CcAdminUtils.fetch_ocsp_response_from_url,
ocsp_request.public_bytes(serialization.Encoding.DER),
BaseSettings.OCSP_URL,
BaseSettings.OCSP_RETRY_COUNT,
"send_ocsp_request",
],
BaseSettings.MAX_OCSP_REQUEST_TIME_DELAY * BaseSettings.OCSP_RETRY_COUNT,
)
except Exception as e:
event_log.error(f"Exception occurred while fetching OCSP response from provided OCSP service: {str(e)}")
ocsp_response = None
# Fallback to Nvidia OCSP Service if the fetch fails
if ocsp_response is None:
nonce = CcAdminUtils.generate_nonce(BaseSettings.SIZE_OF_NONCE_IN_BYTES)
ocsp_request = CcAdminUtils.build_ocsp_request(cert_chain[i], cert_chain[i + 1], nonce)
try:
ocsp_response = function_wrapper_with_timeout(
[
CcAdminUtils.fetch_ocsp_response_from_url,
ocsp_request.public_bytes(serialization.Encoding.DER),
BaseSettings.OCSP_URL_NVIDIA,
BaseSettings.OCSP_RETRY_COUNT,
"send_ocsp_request",
],
BaseSettings.MAX_OCSP_REQUEST_TIME_DELAY * BaseSettings.OCSP_RETRY_COUNT,
)
except Exception as e:
event_log.error(f"Exception occurred while fetching OCSP response from Nvidia OCSP service: {str(e)}")
ocsp_response = None
# Raise error if OCSP response is not fetched from both OCSP services
if ocsp_response is None:
error_msg = f"Failed to fetch the ocsp response for certificate {cert_common_name}"
info_log.error(f"\t\t\t{error_msg}")
raise OCSPFetchError(error_msg)
# Verify the OCSP response status
if ocsp_response.response_status != ocsp.OCSPResponseStatus.SUCCESSFUL:
error_msg = "Couldn't receive a proper response from the OCSP server."
info_log.error(f"\t\t{error_msg}")
return False, error_msg
# Verify the Nonce in the OCSP response
if nonce is not None and nonce != ocsp_response.extensions.get_extension_for_class(OCSPNonce).value.nonce:
error_msg = "The nonce in the OCSP response message is not matching with the one passed in the OCSP request message."
info_log.error(f"\t\t{error_msg}")
return False, error_msg
elif i == end_index - 1:
info_log.debug("\t\tGPU Certificate OCSP Nonce is matching")
# Verify the OCSP response is within the validity period
timestamp_format = "%Y/%m/%d %H:%M:%S UTC"
this_update = ocsp_response.this_update_utc
next_update = ocsp_response.next_update_utc
next_update_extended = next_update + timedelta(hours=BaseSettings.OCSP_VALIDITY_EXTENSION_HRS)
utc_now = datetime.now(timezone.utc)
event_log.debug(f"Current time: {utc_now.strftime(timestamp_format)}")
event_log.debug(f"OCSP this update: {this_update.strftime(timestamp_format)}")
event_log.debug(f"OCSP next update: {next_update.strftime(timestamp_format)}")
event_log.debug(f"OCSP next update extended: {next_update_extended.strftime(timestamp_format)}")
# Outside validity period, print warning
if not (this_update <= utc_now <= next_update):
ocsp_outside_validity_msg = f"OCSP FOR {cert_common_name} IS EXPIRED AFTER {next_update.strftime(timestamp_format)}."
event_log.warning(ocsp_outside_validity_msg)
gpu_attestation_warning_msg_list.append(ocsp_outside_validity_msg)
# Outside extended validity period
if not (this_update <= utc_now <= next_update_extended):
ocsp_outside_extended_validity_msg = (
f"OCSP FOR {cert_common_name} IS EXPIRED AND IS NO LONGER VALID FOR ATTESTATION "
f"AFTER {next_update_extended.strftime(timestamp_format)}."
)
event_log.error(ocsp_outside_extended_validity_msg)
info_log.error(f"\t\tERROR: {ocsp_outside_extended_validity_msg}")
return False, ocsp_outside_extended_validity_msg
# Verifying the ocsp response certificate chain.
ocsp_response_leaf_cert = crypto.load_certificate(
type=crypto.FILETYPE_ASN1,
buffer=ocsp_response.certificates[0].public_bytes(serialization.Encoding.DER),
)
ocsp_cert_chain = [ocsp_response_leaf_cert]
for j in range(i, len(cert_chain)):
ocsp_cert_chain.append(CcAdminUtils.convert_cert_from_cryptography_to_pyopenssl(cert_chain[j]))
ocsp_cert_chain_verification_status = CcAdminUtils.verify_certificate_chain(
ocsp_cert_chain, settings, BaseSettings.Certificate_Chain_Verification_Mode.OCSP_RESPONSE
)
if not ocsp_cert_chain_verification_status:
error_msg = f"The ocsp response certificate chain verification failed for {cert_common_name}."
info_log.error(f"\t\t{error_msg}")
return False, error_msg
elif i == end_index - 1:
info_log.debug("\t\tGPU Certificate OCSP Cert chain is verified")
# Verifying the signature of the ocsp response message.
if not CcAdminUtils.verify_ocsp_signature(ocsp_response):
error_msg = f"The ocsp response response for certificate {cert_common_name} failed due to signature verification failure."
info_log.error(f"\t\t{error_msg}")
return False, error_msg
elif i == end_index - 1:
info_log.debug("\t\tGPU Certificate OCSP Signature is verified")
# The OCSP response certificate status is unknown
if ocsp_response.certificate_status == ocsp.OCSPCertStatus.UNKNOWN:
error_msg = f"The {cert_common_name} certificate revocation status is UNKNOWN"
info_log.error(f"\t\t\t{error_msg}")
return False, error_msg
# The OCSP response certificate status is revoked
if ocsp_response.certificate_status == ocsp.OCSPCertStatus.REVOKED:
# Get cert revoke timestamp
cert_revocation_extension_hrs = 0
if mode == BaseSettings.Certificate_Chain_Verification_Mode.GPU_ATTESTATION:
cert_revocation_extension_hrs = BaseSettings.OCSP_CERT_REVOCATION_DEVICE_EXTENSION_HRS
elif mode == BaseSettings.Certificate_Chain_Verification_Mode.DRIVER_RIM_CERT:
cert_revocation_extension_hrs = BaseSettings.OCSP_CERT_REVOCATION_DRIVER_RIM_EXTENSION_HRS
elif mode == BaseSettings.Certificate_Chain_Verification_Mode.VBIOS_RIM_CERT:
cert_revocation_extension_hrs = BaseSettings.OCSP_CERT_REVOCATION_VBIOS_RIM_EXTENSION_HRS
cert_revocation_time = ocsp_response.revocation_time_utc
cert_revocation_reason = ocsp_response.revocation_reason
cert_revocation_time_extended = cert_revocation_time + timedelta(hours=cert_revocation_extension_hrs)
# Cert is revoked, print warning
cert_revocation_msg = (
f"THE CERTIFICATE {cert_common_name} IS REVOKED FOR '{cert_revocation_reason.value}' "
f"AT {cert_revocation_time.strftime(timestamp_format)}."
)
event_log.warning(cert_revocation_msg)
gpu_attestation_warning_msg_list.append(cert_revocation_msg)
# Cert is revoked but certificate_hold is allowed
if x509.ReasonFlags.certificate_hold == cert_revocation_reason and BaseSettings.allow_hold_cert:
cert_revocation_hold_allowed_msg = (
f"THE CERTIFICATE {cert_common_name} IS REVOKED FOR '{cert_revocation_reason.value}' "
f"BUT STILL GOOD FOR ATTESTATION WITH allow_hold_cert ENABLED."
)
event_log.warning(cert_revocation_hold_allowed_msg)
gpu_attestation_warning_msg_list.append(cert_revocation_hold_allowed_msg)
# Cert is revoked but within the extension period
elif datetime.now(timezone.utc) <= cert_revocation_time_extended:
cert_revocation_within_extension_msg = (
f"THE CERTIFICATE {cert_common_name} IS REVOKED FOR '{cert_revocation_reason.value}' "
f"BUT STILL GOOD FOR ATTESTATION UNTIL {cert_revocation_time_extended.strftime(timestamp_format)} WITH "
f"{cert_revocation_extension_hrs} HOURS OF GRACE PERIOD."
)
event_log.warning(cert_revocation_within_extension_msg)
gpu_attestation_warning_msg_list.append(cert_revocation_within_extension_msg)
# Cert is revoked and outside the extension period
else:
cert_revocation_novalid_msg = (
f"THE CERTIFICATE {cert_common_name} IS REVOKED FOR '{cert_revocation_reason.value}' "
f"AND NO LONGER GOOD FOR ATTESTATION AFTER {cert_revocation_time_extended.strftime(timestamp_format)}."
)
event_log.error(cert_revocation_novalid_msg)
gpu_attestation_warning_msg_list.append(cert_revocation_novalid_msg)
info_log.error(f"\t\t\tERROR: {cert_revocation_novalid_msg}")
info_log.error("\t\t\tThe certificate chain revocation status verification was not successful")
return False, '\n'.join(gpu_attestation_warning_msg_list)
info_log.info(f"\t\t\tThe certificate chain revocation status verification successful.")
return True, '\n'.join(gpu_attestation_warning_msg_list)