cacheck/main.py (119 lines of code) (raw):

# [START gae_python311_app] from flask import Flask, render_template, request, jsonify, make_response from ccadb.db import CCADB from collections import Counter from collections.abc import Iterable import datetime import itertools import functools from lint_dict import LintDict app = Flask(__name__) app.config.from_object('config.Config') def get_lints_json(ca_ids): """ Parses lint issue query options. """ if not isinstance(ca_ids, Iterable) or isinstance(ca_ids, str): raise RuntimeError("Error! ca_ids needs to be an iterable - {} passed".format(type(ca_ids))) cb_to_bool = lambda x: x.lower() in [ 'on', 'true' ] #date ranges start = request.args.get('start', default=None, type=datetime.datetime.fromisoformat) end = request.args.get('end', default=None, type=datetime.datetime.fromisoformat) #cert options onecrl = cb_to_bool( request.args.get('onecrl', default="off") ) expired_certs = cb_to_bool( request.args.get('expired_certs', default="off") ) exclude_tech_constrained = cb_to_bool( request.args.get('exclude_technically_constrained', default="off") ) exclude_revoked = cb_to_bool( request.args.get('exclude_revoked', default="off") ) #lint options x509_lint = cb_to_bool( request.args.get('x509_lint', default="on") ) cab_lint = cb_to_bool( request.args.get('cab_lint', default="on") ) z_lint = cb_to_bool( request.args.get('z_lint', default="on") ) daterange = (start, end) linters = (cab_lint, z_lint, x509_lint) cert_options= (onecrl, expired_certs, exclude_revoked, exclude_tech_constrained) lint_issue = request.args.get('lint_issue', default=-1, type=int) ccadb = CCADB() lint_issues = ccadb.lint_issues_for_ca_ids(map(lambda x: str(x), ca_ids), daterange, cert_options, linters) if lint_issue > 0: return list(filter(lambda x: x['lint_issue_id'] == lint_issue, lint_issues)) return lint_issues @app.route('/lint_issues/<ca_id>', methods=['GET']) def get_lint_issues(ca_id): """ Dump all lint issues for query options as JSON """ ccadb = CCADB() ##max_depth of -1 is infinite max_depth = request.args.get('max_depth', default=-1, type=int) ca_tree, ca_cn_map = ccadb.build_ca_tree(ca_id, max_depth) cca_ids = CCADB._rec_get_keys(ca_tree) cca_ids.add(ca_id) lint_issues = get_lints_json(cca_ids) ##format lint issues by type, then by issue, by issuer #bucket into issuers ld = LintDict() for issue in lint_issues: k = issue['issuer_cn'] + issue['issue_text'] + issue['linter'] ld[k] = ld[k] + [ issue ] return render_template( 'lint.html', issuers=ld ) @app.route('/raw_lint_issues/<ca_id>', methods=['GET']) def get_raw_lint_issues(ca_id): """ Dump all lint issues for query options as JSON """ ccadb = CCADB() ##max_depth of -1 is infinite max_depth = request.args.get('max_depth', default=-1, type=int) ca_tree, ca_cn_map = ccadb.build_ca_tree(ca_id, max_depth) cca_ids = CCADB._rec_get_keys(ca_tree) cca_ids.add(ca_id) return jsonify( get_lints_json(cca_ids) ) def pprint_ca_cert(ccadb, ca_id): """ Print CA cert to meaningful text. Strips a certificates public key modulus, signatures, algorithms, raw data etc. :param ccadb: ccadb instance :param ca_id: int(CA ID) :return: cert pprint :rtype: str """ ca_print = ccadb.pprint_ca_id(ca_id)[0] summary_ind_end = ca_print.index("Subject Public Key Info:") short_ca_print = ca_print[:summary_ind_end].rstrip() lines = short_ca_print.split("\n") filt_lines = list(filter(lambda x: 'Certificate:' not in x and 'Signature Algorithm:' not in x and 'Data:' not in x, lines)) return '\n'.join(filt_lines).rstrip() def pprint_cert(ccadb, cert_id): """ Print CA cert to meaningful text. Strips a certificates public key modulus, signatures, algorithms, raw data etc. :param ccadb: ccadb instance :param ca_id: int(CA ID) :return: cert pprint :rtype: str """ cert_info = ccadb.cert_info(cert_id) #cert_info['issuer'] = ccadb.cert_info(ccabd. cert_info['id']) print(cert_info) return cert_info @app.route('/summary/<ca_id>') def summary(ca_id): """ Produce a summary of lint issues for a given CA ID. This recursively finds lint issues for intermediate and leaf certs. :param ca_id: CA ID to build summary """ try: ca_id = int(ca_id) except ValueError: return "Error! Invalid CA ID - {}. Please specify an integer.".format(ca_id), 400 ##max_depth of -1 is infinite max_depth = request.args.get('max_depth', default=-1, type=int) #confusion of cert id and ca id #2 different concepts ccadb = CCADB() ca_tree, ca_cn_map = ccadb.build_ca_tree(ca_id, max_depth) cca_ids = CCADB._rec_get_keys(ca_tree) cca_ids.add(ca_id) cert_id = ccadb.cert_id_from_ca_id(ca_id) cert_info = pprint_cert(ccadb, cert_id) #except: # return "Error parsing certificate for CA ID {}".format(ca_id), 500 lints = get_lints_json(cca_ids) x509s = list(filter(lambda x: x['linter'] == 'x509lint', lints)) zs = list(filter(lambda x: x['linter'] == 'zlint', lints)) cabs = list(filter(lambda x: x['linter'] == 'cablint', lints)) x509_it = list(map(lambda x: (x['severity'], x['lint_issue_id'], x['issue_text']), x509s)) x509_issues = Counter(x509_it) zs_it = list(map(lambda x: (x['severity'], x['lint_issue_id'], x['issue_text']), zs)) zs_issues = Counter(zs_it) cabs_it = list(map(lambda x: (x['severity'], x['lint_issue_id'], x['issue_text']), cabs)) cabs_issues = Counter(cabs_it) return render_template( 'summary.html', CA_ID=ca_id, x509_issues=x509_issues.items(), zs_issues=zs_issues.items(), cabs_issues=cabs_issues.items(), ca_tree=ca_tree, cca_ids=cca_ids, ca_cn_map=ca_cn_map, cert_info=cert_info) @app.route('/ca_id', methods=['GET']) def get_ca_id(): """ Converts a hex-encoded cert fingerprint to the certs issuing CA ID. SHA-256 or SHA-1 may be used. """ sha256_fingerprint = request.args.get('sha256_fingerprint') sha1_fingerprint = request.args.get('sha1_fingerprint') if not sha1_fingerprint and not sha256_fingerprint: return "Error! Please supply a SHA-1 or SHA-256 fingerprint", 400 digest_type = "sha256" if sha256_fingerprint else "sha1" fp = sha256_fingerprint if sha256_fingerprint else sha1_fingerprint if len(fp) % 2 != 0: return "Error! Odd number of hex characters provided. {}".format(fp), 400 ccadb = CCADB() #return ccadb.issuer_ca_id_from_digest(digest_type, fp) return ccadb.ca_id_from_digest(digest_type, fp) @app.route('/cert/<cert_id>', methods=['GET']) def get_cert_info(cert_id): """ Dumps JSON certificate information from a cert id """ ccadb = CCADB() return jsonify( ccadb.cert_info(int(cert_id)) ) @app.route('/ca_cert/<ca_id>', methods=['GET']) def get_ca_cert_info(ca_id): """ Dumps JSON certificate information from a cert id """ ccadb = CCADB() cert_id = ccadb.cert_id_from_ca_id(int(ca_id)) return jsonify( ccadb.cert_info(int(cert_id)) ) @app.route('/') def root(): """ Renders CAChecker index page. """ resp = make_response( render_template( 'index.html' ) ) #resp.headers['Access-Control-Allow-Origin'] = '*' #resp.headers['Access-Control-Allow-Headers'] = 'x-requested-with' return resp if __name__ == '__main__': app.run(host='127.0.0.1', port=8080, debug=True) # [END gae_python311_app]