security-policies/dev/update_rule_status.py (188 lines of code) (raw):
import importlib
import importlib.util
import os
import sys
from pkgutil import iter_modules
import common
"""
Generates Markdown tables with implemented rules status for all services.
"""
colalign = {
"Rule Number": "center",
"Section": "left",
"Description": "left",
"Status": "center",
"Type": "center",
}
test_cases_variable_name = "test_cases"
test_cases_module_name = "product.tests.data"
def get_integration_test_cases(benchmark_id):
"""
Given a benchmark_id looks up all the submodules, and retrieve the map declared as test_cases
:param benchmark_id: benchmark to look for integration tests
:return: Dictionary of test cases
"""
root_module_name = f"{test_cases_module_name}.{get_provider(benchmark_id)}"
if importlib.util.find_spec(root_module_name) is None:
# No test data found
return dict()
root_module = importlib.import_module(root_module_name)
modules = iter_modules(root_module.__path__)
tcs = dict()
for module in modules:
submodule = importlib.import_module("." + module.name, root_module.__name__)
if not hasattr(submodule, test_cases_variable_name):
print("Could not find test_cases in ", root_module.__name__, module.name)
continue
test_cases = getattr(submodule, test_cases_variable_name)
for _, tc in test_cases.items():
if test_cases_module_name not in type(tc).__module__ or getattr(tc, "rule_tag") is None:
continue
rule_number = tc.rule_tag.replace("CIS ", "")
if rule_number not in tcs:
tcs[rule_number] = {
"passed": False,
"failed": False,
}
tcs[rule_number][tc.expected] = True
return tcs
def generate_integration_test_cel(integration_tests, rule):
"""
Get formatted integration test status
:param integration_tests: all the integration tests
:param rule: which rule to look for
:return: pretty string with Passed Failed status
"""
passed = False
failed = False
if rule in integration_tests:
passed = integration_tests[rule]["passed"]
failed = integration_tests[rule]["failed"]
return f"""Passed {common.status_emoji(passed)} / Failed {common.status_emoji(failed)}
"""
def get_implemented_rules(all_rules, benchmark_id):
"""
Get list of implemented rules in the repository for current service.
:param all_rules: List of all rules for specified benchmark
:param benchmark_id: Benchmark ID
:return: List of implemented rules
"""
# Set all rules as not implemented by default
implemented_rules = {str(rule): common.negative_emoji for rule in all_rules}
# Construct path to rules directory for current service
rules_dir = os.path.join("../bundle", "compliance", f"{benchmark_id}", "rules")
# Get list of all rule files in the rules directory
rule_files = os.listdir(rules_dir)
# Iterate over all rule files
for rule_file in rule_files:
# Extract rule number from rule file name
rule_number = rule_file.removeprefix("cis_").replace("_", ".")
# Set rule as implemented
implemented_rules[rule_number] = common.positive_emoji
return implemented_rules
def generate_md_table(benchmark_id):
"""
Generate Markdown table with implemented rules status for current service.
:param benchmark_id: Benchmark ID
:return: Markdown table
"""
rules_data, sections = common.parse_rules_data_from_excel(benchmark_id)
# Rename "Title" column to "Description"
rules_data.rename(columns={"Title": "Description"}, inplace=True)
# Get list of all rules in sheet
all_rules = rules_data["Rule Number"].to_list()
# Get list of implemented rules
implemented_rules = get_implemented_rules(all_rules, benchmark_id)
# Get integration tests for benchmark
test_cases = get_integration_test_cases(benchmark_id)
# Add implemented rules' and Integration Tests column to the data
for rule, total_status in implemented_rules.items():
rules_data.loc[rules_data["Rule Number"] == rule, "Status"] = total_status
rules_data.loc[rules_data["Rule Number"] == rule, "Integration Tests"] = generate_integration_test_cel(
test_cases,
rule,
)
rules_data["Section"] = rules_data["Section"].apply(
lambda section_id: sections[section_id],
)
new_order = ["Rule Number", "Section", "Description", "Status", "Integration Tests", "Type"]
rules_data = rules_data.reindex(columns=new_order)
rules_data = rules_data.sort_values("Rule Number")
rules_data["Rule Number"] = rules_data["Rule Number"].apply(
get_rule_path,
benchmark_id=benchmark_id,
implemented_rules=implemented_rules,
)
# Convert DataFrame to Markdown table
table = rules_data.to_markdown(
index=False,
tablefmt="pipe",
colalign=colalign.values(),
)
# Add table title
total_rules, total_implemented, total_status = total_rules_status(rules_data)
total_automated, automated_implemented, automated_status = automated_rules_status(
rules_data,
)
total_manual, manual_implemented, manual_status = manual_rules_status(rules_data)
total_expected_tests, implemented_tests, test_status = integration_test_status(all_rules, test_cases)
description = f"### {total_implemented}/{total_rules} implemented rules ({total_status:.0%})\n\n"
description += f"#### Automated rules: {automated_implemented}/{total_automated} ({automated_status:.0%})\n\n"
description += f"#### Manual rules: {manual_implemented}/{total_manual} ({manual_status:.0%})\n\n"
description += (
f"#### Integration Tests Coverage: {implemented_tests}/{total_expected_tests} ({test_status:.0%})\n\n"
)
total_percentage = total_status * 100
return table, description, total_percentage
def total_rules_status(rules_data):
"""
Get number of total rules and number of implemented rules.
:param rules_data: Rules data
:return: Number of total rules and number of implemented rules
"""
implemented_rules = rules_data[rules_data["Status"] == common.positive_emoji]
status = len(implemented_rules) / len(rules_data)
return len(rules_data), len(implemented_rules), status
def integration_test_status(all_rules, test_cases):
"""
Calculates the coverage percentage
:param all_rules: all the expected rules
:param test_cases: all the test cases
:return: total expected cases, total test cases and the coverage percentage
"""
total_expected_test_cases = len(all_rules) * 2 # twice because we expect a passed and failed test case per rule
total_test_cases = 0
for rule in all_rules:
if rule not in test_cases:
continue
total_test_cases += 1 if test_cases[rule]["passed"] else 0
total_test_cases += 1 if test_cases[rule]["failed"] else 0
return total_expected_test_cases, total_test_cases, total_test_cases / total_expected_test_cases
def automated_rules_status(rules_data):
"""
Get number of automated rules and number of implemented automated rules.
:param rules_data: Rules data
:return: Number of automated rules and number of implemented automated rules
"""
automated_rules = rules_data[rules_data["Type"] == "Automated"]
automated_implemented = automated_rules[automated_rules["Status"] == common.positive_emoji]
status = len(automated_implemented) / len(automated_rules)
return len(automated_rules), len(automated_implemented), status
def manual_rules_status(rules_data):
"""
Get number of manual rules and number of implemented manual rules.
:param rules_data: Rules data
:return: Number of manual rules and number of implemented manual rules
"""
manual_rules = rules_data[rules_data["Type"] == "Manual"]
manual_implemented = manual_rules[manual_rules["Status"] == common.positive_emoji]
status = len(manual_implemented) / len(manual_rules)
return len(manual_rules), len(manual_implemented), status
def get_rule_path(rule, benchmark_id, implemented_rules):
"""
Get rule path for specified rule and service.
:param implemented_rules: ‘Implemented’ column values
:param rule: Rule number
:param benchmark_id: Benchmark ID
:return: Rule path in the repository
"""
if implemented_rules[rule] == common.positive_emoji:
return f"[{rule}](bundle/compliance/{benchmark_id}/rules/cis_{rule.replace('.', '_')})"
else:
return rule
def update_main_readme_status_badge(percentage, service):
"""
Update status badge in the main README file.
:param percentage: Percentage of implemented rules
:param service: Service name (k8s, eks, aws)
"""
readme_path = "../README.md"
badge_api = "https://img.shields.io/badge"
with open(readme_path, "r+") as f:
readme = f.readlines()
if service == "k8s":
badge = (
f"[-326CE5?"
f"logo=Kubernetes)](RULES.md#k8s-cis-benchmark)\n"
)
elif service == "eks":
badge = (
f"[-FF9900?"
f"logo=Amazon+EKS)](RULES.md#eks-cis-benchmark)\n"
)
elif service == "aws":
badge = (
f"[-232F3E?"
f"logo=Amazon+AWS)](RULES.md#aws-cis-benchmark)\n"
)
elif service == "gcp":
badge = (
f"[-4285F4?"
f"logo=Google+Cloud)](RULES.md#gcp-cis-benchmark)\n"
)
elif service == "azure":
badge = (
f"[-0078D4?"
f"logo=Microsoft+Azure)](RULES.md#azure-cis-benchmark)\n"
)
badge_line = get_badge_line_number(readme, service)
readme[badge_line] = badge
f.seek(0)
f.truncate()
f.writelines(readme)
def get_badge_line_number(readme, service):
"""
Get line number of the status badge in the main README file.
:param readme: Main README file
:param service: Service name (k8s, eks, aws)
:return: Line number
"""
for i, line in enumerate(readme):
if line.startswith(f"[![CIS {service.upper()}]"):
return i
def generate_table_of_contents():
return """
## Table of Contents\n
- [Kubernetes CIS Benchmark](#k8s-cis-benchmark)
- [Amazon EKS CIS Benchmark](#eks-cis-benchmark)
- [Amazon AWS CIS Benchmark](#aws-cis-benchmark)
- [Google Cloud CIS Benchmark](#gcp-cis-benchmark)
- [Microsoft Azure CIS Benchmark](#azure-cis-benchmark)"""
def get_provider(benchmark_id):
return benchmark_id.removeprefix("cis_")
if __name__ == "__main__":
# Set working directory to the dev directory
os.chdir(os.path.join(common.repo_root.working_dir, "security-policies", "dev"))
# Allow to import from tests folder to fetch Integration tests data
sys.path.append("../../tests/")
# Write Markdown table to file
with open("../RULES.md", "w") as f:
f.write(f"# Rules Status\n")
table_of_contents = generate_table_of_contents()
f.write(table_of_contents)
for benchmark_id in common.benchmark.keys():
print(f"Generating Markdown table for '{benchmark_id}' service")
benchmark_title = f"{get_provider(benchmark_id).upper()} CIS Benchmark"
f.write(f"\n\n## {benchmark_title}\n\n")
table, description, percentage = generate_md_table(benchmark_id)
f.write(description)
f.write(f"<details><summary><h3>Full Table 📋</h3></summary>\n\n")
f.write(table)
f.write("\n</details>")
update_main_readme_status_badge(
percentage=percentage,
service=benchmark_id.removeprefix("cis_"),
)
f.write("\n")