security-policies/dev/generate_rule_metadata.py (227 lines of code) (raw):

import argparse import os import uuid from dataclasses import asdict, dataclass import common import pandas as pd from ruamel.yaml import YAML yml = YAML() KSPM_POSTURE_TYPE = "kspm" CSPM_POSTURE_TYPE = "cspm" @dataclass class Benchmark: name: str version: str id: str rule_number: str posture_type: str @dataclass class Rule: id: str name: str profile_applicability: str description: str rationale: str audit: str remediation: str impact: str default_value: str references: str section: str version: str tags: list[str] benchmark: Benchmark selected_columns_map = { "cis_k8s": { "Section #": "Section", "Recommendation #": "Rule Number", "Title": "Title", "Description": "description", "Rational Statement": "rationale", "Audit Procedure": "audit", "Remediation Procedure": "remediation", "Impact Statement": "impact", # "": "default_value", # todo: talk with CIS team to add this column to the excel "references": "references", "Assessment Status": "type", }, "cis_eks": { "section #": "Section", "recommendation #": "Rule Number", "title": "Title", "description": "description", "rationale statement": "rationale", "audit procedure": "audit", "remediation procedure": "remediation", "impact statement": "impact", # "": "default_value", # todo: talk with CIS team to add this column to the excel "references": "references", "scoring status": "type", }, "cis_aws": { "Section #": "Section", "Recommendation #": "Rule Number", "Title": "Title", "Description": "description", "Rational Statement": "rationale", "Audit Procedure": "audit", "Remediation Procedure": "remediation", "Impact Statement": "impact", # "": "default_value", # todo: talk with CIS team to add this column to the excel "References": "references", "Assessment Status": "type", }, "cis_gcp": { "Section #": "Section", "Recommendation #": "Rule Number", "Title": "Title", "Description": "description", "Rationale Statement": "rationale", "Audit Procedure": "audit", "Remediation Procedure": "remediation", "Impact Statement": "impact", # "": "default_value", # todo: talk with CIS team to add this column to the excel "References": "references", "Assessment Status": "type", }, "cis_azure": { "Section #": "Section", "Recommendation #": "Rule Number", "Title": "Title", "Description": "description", "Rationale Statement": "rationale", "Audit Procedure": "audit", "Remediation Procedure": "remediation", "Impact Statement": "impact", # "": "default_value", # todo: talk with CIS team to add this column to the excel "References": "references", "Assessment Status": "type", }, } benchmark_to_posture_type = { "cis_k8s": KSPM_POSTURE_TYPE, "cis_eks": KSPM_POSTURE_TYPE, "cis_aws": CSPM_POSTURE_TYPE, "cis_gcp": CSPM_POSTURE_TYPE, "cis_azure": CSPM_POSTURE_TYPE, } def parse_refs(refs: str): """ Parse references - they are split by `:` which is the worst token possible for urls... """ if refs != "": ref = [f"http{ref}" for ref in refs.split(":http") if ref] ref[0] = ref[0].removeprefix("http") return "\n".join(f"{i + 1}. {s}" for i, s in enumerate(ref)) return refs def read_existing_default_value(rule_number, benchmark_id): """ Read default value from existing rule (The excel file doesn't contain default values) :param rule_number: Rule number :param benchmark_id: Benchmark ID :return: Default value """ rule_dir = os.path.join( common.rules_dir, f"{benchmark_id}/rules", f"cis_{rule_number.replace('.', '_')}", ) try: with open(os.path.join(rule_dir, "data.yaml"), "r") as f: data = yml.load(f) default_value = data["metadata"]["default_value"] if default_value is None or default_value == "": print( f"{benchmark_id}/{rule_number} is missing default value - please make sure to add it manually", ) return "" return data["metadata"]["default_value"] except FileNotFoundError: print(f"Rule implementation for {benchmark_id}/{rule_number} is missing") return "" def generate_rule_benchmark_metadata(benchmark_id: str, rule_number: str): """ Generate benchmark metadata for rules :param benchmark_id: Benchmark ID :param rule_number: Rule number """ return Benchmark( name=common.benchmark[benchmark_id].split("Benchmark")[0].replace("_", " ").removesuffix(" "), version=common.benchmark[benchmark_id].split("Benchmark")[1].removeprefix("_").removesuffix(".xlsx"), id=f"{benchmark_id}", rule_number=rule_number, posture_type=benchmark_to_posture_type[benchmark_id], ) def replace_nan_with_empty_string(data: pd.DataFrame): """ Replace NaN values with empty strings (they are represented as `nan` in the Excel for some reason) """ return data.replace("nan", "") def rule_is_implemented(rule_number: str, benchmark_id: str): """ Check if rule was implemented :param rule_number: Rule number :param benchmark_id: Benchmark ID :return: True if rule was implemented, False otherwise """ rule_path = os.path.join( common.rules_dir, f"{benchmark_id}/rules", f"cis_{rule_number.replace('.', '_')}", ) return os.path.isdir(rule_path) def generate_metadata(benchmark_id: str, raw_data: pd.DataFrame, sections: dict): """ Generate metadata for rules :param benchmark_id: Benchmark ID :param raw_data: ‘Raw’ data from the spreadsheet :param sections: Section metadata :return: List of Rule objects """ normalized_data = replace_nan_with_empty_string(raw_data) metadata = [] benchmark_tag = benchmark_id.removeprefix("cis_").upper() if benchmark_id != "cis_k8s" else f"Kubernetes" for rule in normalized_data.to_dict(orient="records"): # Check if rule was implemented if not rule_is_implemented(rule["Rule Number"], benchmark_id): continue benchmark_metadata = generate_rule_benchmark_metadata( benchmark_id, rule["Rule Number"], ) r = Rule( id=str( uuid.uuid5( uuid.NAMESPACE_DNS, f"{benchmark_metadata.name} {rule['Title']} {rule['Rule Number']}", ), ), name=rule["Title"], profile_applicability=f"* {rule['profile_applicability']}", description=common.fix_code_blocks(rule["description"]), rationale=common.fix_code_blocks(rule.get("rationale", "")), audit=common.fix_code_blocks(rule.get("audit", "")), remediation=common.fix_code_blocks(rule.get("remediation", "")), impact=rule.get("impact", ""), default_value=rule.get( "default_value", read_existing_default_value(rule["Rule Number"], benchmark_id), ), references=parse_refs(rule.get("references", "")), section=sections[rule["Section"]], tags=[ "CIS", benchmark_tag, f"CIS {rule['Rule Number']}", sections[rule["Section"]], ], version="1.0", benchmark=benchmark_metadata, ) metadata.append(r) return metadata def save_metadata(metadata: list[Rule], benchmark_id): """ Save metadata to file :param metadata: List of Rule objects :param benchmark_id: Benchmark ID :return: None """ for rule in metadata: rule_package = f"cis_{rule.benchmark.rule_number.replace('.', '_')}" rule_dir = os.path.join(common.rules_dir, f"{benchmark_id}/rules", rule_package) try: with open(os.path.join(rule_dir, "data.yaml"), "w+") as f: yml.dump({"metadata": common.apply_pss_recursively(asdict(rule))}, f) except FileNotFoundError: continue # ignore rules that are not implemented if __name__ == "__main__": os.chdir(os.path.join(common.repo_root.working_dir, "security-policies", "dev")) parser = argparse.ArgumentParser( description="CIS Benchmark parser CLI", ) parser.add_argument( "-b", "--benchmark", default=common.benchmark.keys(), choices=common.benchmark.keys(), help="benchmark to be used for the rules metadata generation (default: all benchmarks). " "for example: `--benchmark cis_eks` or `--benchmark cis_eks cis_aws`", nargs="+", ) parser.add_argument( "-r", "--rules", help="set of specific rules to be parsed (default: all rules).", nargs="+", ) args = parser.parse_args() if type(args.benchmark) is str: args.benchmark = [args.benchmark] for benchmark_id in args.benchmark: print(f"### Processing {benchmark_id.replace('_', ' ').upper()}") # Parse Excel data raw_data, sections = common.parse_rules_data_from_excel( selected_columns=selected_columns_map, benchmark_id=benchmark_id, selected_rules=args.rules, ) metadata = generate_metadata(benchmark_id, raw_data, sections) save_metadata(metadata, benchmark_id)