security-policies/dev/generate_rule_templates.py (102 lines of code) (raw):

import argparse import json import os from pathlib import Path import common import yaml INTEGRATION_RULE_TEMPLATE_DIR = "../../../integrations/packages/cloud_security_posture/kibana/csp_rule_template/" def generate_rule_templates( benchmark: str, selected_rules: list, rule_template_dir: str, ): """ Generate rule templates from existing rules """ rules_templates = [] benchmark_rules_dir = f"../bundle/compliance/{benchmark}/rules/" rules = os.listdir(benchmark_rules_dir) # if no rules are selected, generate all rules if not selected_rules: selected_rules = rules for rule in rules: # supporting both cis_1_1_1 and 1.1.1 formats if rule not in selected_rules and rule.removeprefix("cis_").replace("_", ".") not in selected_rules: continue with open( os.path.join(benchmark_rules_dir, f"{rule}/data.yaml"), "r", encoding="utf-8", ) as f: rule_obj = yaml.safe_load(f.read())["metadata"] rule_obj["rego_rule_id"] = rule rule_template = migrate_csp_rule_metadata( { "id": rule_obj["id"], "type": "csp-rule-template", "attributes": rule_obj, }, ) rules_templates.append(rule_template) # Write templates into file print(f"Processed {len(rules_templates)} rules for {benchmark} benchmark") save_rule_templates(rules_templates, rule_template_dir) def save_rule_templates(rule_templates: list[dict], rule_template_dir: str): """ Save rule templates to file """ if not rule_template_dir: rule_template_dir = INTEGRATION_RULE_TEMPLATE_DIR # ensure path exist, else create it Path(rule_template_dir).mkdir(parents=True, exist_ok=True) for rule_template in rule_templates: with open( os.path.join(rule_template_dir, f"{rule_template['id']}.json"), "w", ) as f: json.dump(rule_template, f, indent=4) def migrate_csp_rule_metadata(doc: dict) -> dict: """ Migrate rule metadata to integration format """ attributes = doc["attributes"] print(f"Processing {attributes['benchmark']['rule_number']}") metadata = { "impact": attributes.pop("impact", None), "default_value": attributes.pop("default_value", None), "references": attributes.pop("references", None), **attributes, } return { **doc, "attributes": { "metadata": metadata, }, "migrationVersion": { "csp-rule-template": "8.7.0", }, "coreMigrationVersion": "8.7.0", } if __name__ == "__main__": os.chdir(os.path.join(common.repo_root.working_dir, "security-policies", "dev")) parser = argparse.ArgumentParser( description="CIS Benchmark Rules Templates Generator CLI", ) parser.add_argument( "-b", "--benchmark", default=common.benchmark.keys(), choices=common.benchmark.keys(), help="benchmark to be used for the rules template generation (default: all benchmarks). " "for example: `--benchmark cis_eks` or `--benchmark cis_eks cis_aws`", nargs="+", ) parser.add_argument( "-r", "--rules", default=[], help="set of specific rules to be parsed (default: all rules)." "for example: `--rules 1.1 1.2` or `--rules cis_1_1 cis_1_2`", nargs="+", ) parser.add_argument( "-o", "--out", help=f"output directory for the generated rules templates (default: {INTEGRATION_RULE_TEMPLATE_DIR}).", default=INTEGRATION_RULE_TEMPLATE_DIR, ) args = parser.parse_args() if type(args.benchmark) is str: args.benchmark = [args.benchmark] if type(args.rules) is str: args.rules = [args.rules] for benchmark_id in args.benchmark: print(f"### Processing {benchmark_id.replace('_', ' ').upper()}") generate_rule_templates(benchmark_id, args.rules, args.out)