security-policies/dev/common.py (144 lines of code) (raw):

import json import os import git import pandas as pd import regex as re from ruamel.yaml.scalarstring import PreservedScalarString as pss repo_root = git.Repo(".", search_parent_directories=True) rules_dir = os.path.join( repo_root.working_dir, "security-policies/bundle/compliance", ) CODE_BLOCK_SIZE = 100 negative_emoji = ":x:" # ❌ positive_emoji = ":white_check_mark:" # ✅ benchmark = { "cis_k8s": "CIS_Kubernetes_V1.23_Benchmark_v1.0.1.xlsx", "cis_eks": "CIS_Amazon_Elastic_Kubernetes_Service_(EKS)_Benchmark_v1.0.1.xlsx", "cis_aws": "CIS_Amazon_Web_Services_Foundations_Benchmark_v1.5.0.xlsx", "cis_gcp": "CIS_Google_Cloud_Platform_Foundation_Benchmark_v2.0.0.xlsx", "cis_azure": "CIS_Microsoft_Azure_Foundations_Benchmark_v2.0.0.xlsx", } relevant_sheets = { "cis_k8s": [ "Level 1 - Master Node", "Level 2 - Master Node", "Level 1 - Worker Node", "Level 2 - Worker Node", ], "cis_eks": ["Level 1", "Level 2"], "cis_aws": ["Level 1", "Level 2"], "cis_gcp": ["Level 1", "Level 2"], "cis_azure": ["Level 1", "Level 2"], } default_selected_columns_map = { "cis_k8s": { "Section #": "Section", "Recommendation #": "Rule Number", "Title": "Title", "Assessment Status": "Type", }, "cis_eks": { "section #": "Section", "recommendation #": "Rule Number", "title": "Title", "scoring status": "Type", }, "cis_aws": { "Section #": "Section", "Recommendation #": "Rule Number", "Title": "Title", "Assessment Status": "Type", }, "cis_gcp": { "Section #": "Section", "Recommendation #": "Rule Number", "Title": "Title", "Assessment Status": "Type", }, "cis_azure": { "Section #": "Section", "Recommendation #": "Rule Number", "Title": "Title", "Assessment Status": "Type", }, } def status_emoji(positive): if positive: return positive_emoji return negative_emoji def parse_rules_data_from_excel( benchmark_id, selected_columns=None, selected_rules=None, ): """ Parse rules data from Excel file for current service. :param selected_rules: List of rules to parse :param selected_columns: Dictionary with columns to select from the sheet :param benchmark_id: Benchmark ID :return: Pandas DataFrame with rules data for current service and sections """ if selected_columns is None: selected_columns = default_selected_columns_map benchmark_name = benchmark[benchmark_id] input_path = f"input/{benchmark_name}" sheets = relevant_sheets[benchmark_id] rules_data = pd.DataFrame() sections_df = pd.DataFrame() for sheet_name in sheets: print(f"Processing sheet '{sheet_name}'") excel_file = pd.read_excel(input_path, sheet_name=sheet_name) # Select only the columns you want to include in the Markdown table data = excel_file[selected_columns[benchmark_id].keys()] # Update Table headers data.columns = selected_columns[benchmark_id].values() # Remove rows with empty values in the "Rule Number" column and convert to string sections_curr_sheet = data.loc[ data["Rule Number"].isna(), ["Section", "Title"], ].astype(str) # Filter out section information data = data[data["Rule Number"].notna()].astype(str) # Only keep the rules that are selected if selected_rules is not None: data = data[data["Rule Number"].isin(selected_rules)] # Add a new column with the sheet name data = data.assign(profile_applicability=sheet_name) rules_data = pd.concat([rules_data, data]).drop_duplicates(subset="Rule Number").reset_index(drop=True) sections_df = ( pd.concat([sections_df, sections_curr_sheet]) .drop_duplicates(subset="Section") .reset_index( drop=True, ) ) sections = {section: title for section, title in sections_df.values} return rules_data, sections def check_and_fix_numbered_list(text): # Split the text into lines lines = text.split("\n") # Find the lines that start with a number and a period, and store their indices numbered_lines = [(i, line) for i, line in enumerate(lines) if re.match(r"^\d+\.", line)] # Check if the numbered lines are consecutively numbered for i, (index, line) in enumerate(numbered_lines): # Extract the number from the line line_number = int(line.split(".")[0]) # Check if the line number is correct if line_number != i + 1: # The line number is not correct, fix it by replacing the line with the correct line number corrected_line = f"{i + 1}. {line.removeprefix(str(line_number) + '. ')}" lines[index] = corrected_line # Join the lines back into a single string and return the result return "\n".join(lines) def add_new_line_after_period(text): # Split the text into lines lines = text.split("\n") # Find the lines that start with a number and a period numbered_lines = [line for line in lines if re.match(r"^\d+\.", line)] # Iterate through the lines and add a new line after a period, unless the line is a numbered line for i, line in enumerate(lines): if line not in numbered_lines: lines[i] = line.replace(". ", ".\n") # Join the lines back into a single string and return the result return "\n".join(lines) def format_json_in_text(text): try: # Search for JSON-like content in the text start_index = text.find("{") end_index = text.rfind("}") + 1 json_str = text[start_index:end_index] # Try to load and format the JSON parsed_json = json.loads(json_str) formatted_json = json.dumps(parsed_json, indent=4) # Replace the original JSON string in the text with the formatted one formatted_text = text[:start_index] + formatted_json + text[end_index:] return formatted_text except: # If JSON extraction or formatting fails, return the original text return text def fix_code_blocks(text: str): text = add_new_line_after_period(text) text = format_json_in_text(text) return check_and_fix_numbered_list(text) def apply_pss_recursively(data): if isinstance(data, dict): return {key: apply_pss_recursively(value) for key, value in data.items()} elif isinstance(data, list): return [value for value in data] elif isinstance(data, str): return pss(data) if len(data) > CODE_BLOCK_SIZE else data else: return data