in cacheck/ccadb/db.py [0:0]
def lint_issues_for_ca_ids(self, ca_ids, daterange, cert_options, linters):
"""
Finds lint issues for a CA id
:param fingerprint: Base64 encoded string that corresponds to the
fingerprint of a certificate
:return: The issuing CA ID of the certificate
:rtype: int
"""
if True not in linters:
raise RuntimeError("Error! No linters selected!")
exclude_onecrl, exclude_expired_certs, exclude_revoked, exclude_technically_constrained = cert_options
lint_names = [ "cablint", "zlint", "x509lint" ]
query_linter = set(filter(lambda x: x[1], zip(lint_names, linters)))
query_conf = '{' + ','.join(list(zip(*query_linter))[0]) + '}'
caids_conf = '{' + ','.join(ca_ids) + '}'
args = [caids_conf, query_conf]
if app.config['DEBUG']:
print("ca ids:", ca_ids, ", start: ", daterange[0], ", end:", daterange[1], ", linters: ", linters)
print(query_linter)
print(query_conf)
sql_query = "SELECT lint_cert_issue.certificate_id, lint_issue.issue_text, lint_issue_id, certificate.issuer_ca_id, x509_notbefore(certificate.certificate), x509_notafter(certificate.certificate), issue_text, linter, severity, "
sql_query += "x509_issuername(certificate.certificate), x509_subjectname(certificate.certificate), "
sql_query += "encode(digest(certificate.certificate, 'sha256'), 'hex'), "
sql_query += "EXISTS(SELECT certificate_id FROM google_revoked WHERE certificate_id=certificate.id), "
sql_query += "EXISTS(SELECT certificate_id FROM mozilla_onecrl WHERE certificate_id=certificate.id), "
sql_query += "EXISTS(SELECT certificate_id FROM microsoft_disallowedcert WHERE certificate_id=certificate.id) "
sql_query += "FROM lint_cert_issue LEFT JOIN lint_issue "
sql_query += "ON lint_cert_issue.lint_issue_id=lint_issue.id "
##left join on certificate to filter expired certificates
sql_query += "LEFT JOIN certificate "
sql_query += "ON lint_cert_issue.certificate_id=certificate.id "
sql_query += "WHERE lint_cert_issue.issuer_ca_id = ANY(%s) "
#sql_query += "AND linter = ANY('{cablint,x509lint,zlint}')"
sql_query += "AND linter = ANY(%s) "
if exclude_expired_certs:
sql_query += "AND x509_notafter(certificate.certificate) > NOW() "
if exclude_technically_constrained:
#TODO: is_technically_constrained2?
sql_query += "AND is_technically_constrained(certificate.certificate) = false "
if daterange[0]:
sql_query += "AND lint_cert_issue.not_before_date > %s "
args.append(daterange[0].strftime('%Y-%m-%d'))
if daterange[1]:
sql_query += "AND lint_cert_issue.not_before_date < %s "
args.append(daterange[1].strftime('%Y-%m-%d'))
if exclude_onecrl:
sql_query += "AND lint_cert_issue.certificate_id NOT IN ( "
sql_query += "SELECT certificate_id FROM mozilla_onecrl WHERE certificate_id IS NOT NULL ) "
cursor = self._query_db(sql_query, args)
res = cursor.fetchall()
lint_issues = []
for r in res:
print(r)
fields = [
'certificate_id', 'issue_text', 'lint_issue_id', 'issuer_ca_id', 'not_before_date', 'not_after_date',
'issue_text', 'linter', 'severity', 'issuer_cn', 'subject_cn',
'sha256_fingerprint', 'google_revoked',
'onecrl_revoked', 'microsoft_revoked'
]
lint_issue = dict(zip(fields, r))
#if not isinstance(lint_issue['revocation_status'], str):
# lint_issue['revocation_status'] = 'Not Revoked'
if exclude_revoked:
revoked = False
for k in [ 'onecrl_revoked', 'microsoft_revoked', 'google_revoked' ]:
if lint_issue[k]:
revoked = True
break
if not revoked:
lint_issues.append(lint_issue)
else:
lint_issues.append(lint_issue)
cursor.close()
return lint_issues