infra/enforcement/account_keys.py (302 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import datetime import logging import sys import yaml import argparse import os from typing import List, Dict, TypedDict, Optional from google.cloud import secretmanager from google.cloud import iam_admin_v1 from google.cloud.iam_admin_v1 import types from sending import SendingClient SECRET_MANAGER_LABEL = "beam-infra-secret-manager" class AuthorizedUser(TypedDict): email: str class ServiceAccount(TypedDict): account_id: str display_name: str authorized_users: List[AuthorizedUser] class ServiceAccountsConfig(TypedDict): service_accounts: List[ServiceAccount] CONFIG_FILE = "config.yml" class AccountKeysPolicyComplianceCheck: def __init__(self, project_id: str, service_account_keys_file: str, logger: logging.Logger, sending_client: Optional[SendingClient] = None): self.project_id = project_id self.service_account_keys_file = service_account_keys_file self.logger = logger self.sending_client = sending_client self.secret_client = secretmanager.SecretManagerServiceClient() self.service_account_client = iam_admin_v1.IAMClient() def _normalize_account_email(self, account_id: str) -> str: """ Normalizes the account identifier to a full email format. Args: account_id (str): The unique identifier or email of the service account. Returns: str: The full service account email address. """ if "@" in account_id: return account_id else: return f"{account_id}@{self.project_id}.iam.gserviceaccount.com" def _denormalize_account_email(self, email: str) -> str: """ Denormalizes the full service account email address to its unique identifier. Args: email (str): The full service account email address. Returns: str: The unique identifier for the service account. """ if email.endswith(f"@{self.project_id}.iam.gserviceaccount.com"): return email.split("@")[0] return email def _normalize_username(self, username: str) -> str: """ Normalizes the username to a consistent format. Args: username (str): The username to normalize. Returns: str: The normalized username. """ if not username.startswith("user:"): return f"user:{username.strip().lower()}" return username def _denormalize_username(self, username: str) -> str: """ Denormalizes the username from the consistent format. Args: username (str): The normalized username. Returns: str: The denormalized username. """ if username.startswith("user:"): return username.split(":", 1)[1].strip().lower() return username def _get_all_live_service_accounts(self) -> List[str]: """ Retrieves all service accounts that are currently active (not disabled) in the project. Returns: List[str]: A list of email addresses for all live service accounts. """ request = types.ListServiceAccountsRequest() request.name = f"projects/{self.project_id}" try: accounts = self.service_account_client.list_service_accounts(request=request) self.logger.debug(f"Retrieved {len(accounts.accounts)} service accounts for project {self.project_id}") if not accounts: self.logger.warning(f"No service accounts found in project {self.project_id}.") return [] return [self._normalize_account_email(account.email) for account in accounts.accounts if not account.disabled] except Exception as e: self.logger.error(f"Failed to retrieve service accounts for project {self.project_id}: {e}") raise def _get_all_live_managed_secrets(self) -> List[str]: """ Retrieves the list of secrets from the Secret Manager that where created by the beam-secret-service Returns: List[str]: A list of secret ids """ try: secrets = list(self.secret_client.list_secrets(request={"parent": f"projects/{self.project_id}"})) self.logger.debug(f"Retrieved {len(secrets)} secrets for project {self.project_id}") if not secrets: self.logger.warning(f"No secrets found in project {self.project_id}.") return [] return [secret.name.split("/")[-1] for secret in secrets if "created_by" in secret.labels and secret.labels["created_by"] == SECRET_MANAGER_LABEL] except Exception as e: self.logger.error(f"Failed to retrieve secrets for project {self.project_id}: {e}") raise def _get_all_secret_authorized_users(self, secret_id: str) -> List[str]: """ Retrieves a list of all users who have access to the secrets in the Secret Manager. Args: secret_id (str): The ID of the secret to check access for. Returns: List[str]: A list of email addresses for all users authorized to access the secrets. """ accessor_role = "roles/secretmanager.secretAccessor" resource_name = self.secret_client.secret_path(self.project_id, secret_id) try: policy = self.secret_client.get_iam_policy(request={"resource": resource_name}) self.logger.debug(f"Retrieved IAM policy for secret '{secret_id}': {policy}") if not policy.bindings: self.logger.warning(f"No IAM bindings found for secret '{secret_id}'.") return [] authorized_users = [] for binding in policy.bindings: if binding.role == accessor_role: for user in binding.members: authorized_users.append(self._normalize_username(user)) return authorized_users except Exception as e: self.logger.error(f"Failed to get IAM policy for secret '{secret_id}': {e}") raise def _read_service_account_keys(self) -> ServiceAccountsConfig: """ Reads the service account keys from a YAML file and returns a list of ServiceAccount objects. Returns: List[ServiceAccount]: A list of service account declarations. """ try: with open(self.service_account_keys_file, "r") as file: keys = yaml.safe_load(file) if not keys or keys.get("service_accounts") is None: return {"service_accounts": []} return keys except FileNotFoundError: self.logger.info(f"Service account keys file {self.service_account_keys_file} not found, starting with empty configuration") return {"service_accounts": []} except IOError as e: error_msg = f"Failed to read service account keys from {self.service_account_keys_file}: {e}" self.logger.error(error_msg) raise def _to_yaml_file(self, data: List[ServiceAccount], output_file: str, header_info: str = "") -> None: """ Writes a list of dictionaries to a YAML file. Include the apache license header on the files Args: data: A list of dictionaries containing user permissions and details. output_file: The file path where the YAML output will be written. header_info: A string containing the header information to be included in the YAML file. """ apache_license_header = """# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ # Prepare the header with the Apache license header = f"{apache_license_header}\n# {header_info}\n# Generated on {datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')} UTC\n\n" try: with open(output_file, "w") as file: file.write(header) yaml_data = {"service_accounts": data} yaml.dump(yaml_data, file, sort_keys=False, default_flow_style=False, indent=2) self.logger.info(f"Successfully wrote Service Account Keys policy data to {output_file}") except IOError as e: self.logger.error(f"Failed to write to {output_file}: {e}") def check_compliance(self) -> List[str]: """ Checks the compliance of service account keys with the defined policies. Returns: List[str]: A list of compliance issue messages. """ service_account_data = self._read_service_account_keys() file_service_accounts = service_account_data.get("service_accounts") if not file_service_accounts: file_service_accounts = [] self.logger.info(f"No service account keys found in the {self.service_account_keys_file}.") compliance_issues = [] # Check that all service accounts that exist are declared for service_account in self._get_all_live_service_accounts(): if self._denormalize_account_email(service_account) not in [account["account_id"] for account in file_service_accounts]: msg = f"Service account '{service_account}' is not declared in the service account keys file." compliance_issues.append(msg) self.logger.warning(msg) managed_secrets = self._get_all_live_managed_secrets() extracted_secrets = [f"{self._denormalize_account_email(account['account_id'])}-key" for account in file_service_accounts] # Check for managed secrets that are not declared for secret in managed_secrets: if secret not in extracted_secrets: msg = f"Managed secret '{secret}' is not declared in the service account keys file." compliance_issues.append(msg) self.logger.warning(msg) # Check for each managed secret if it has the correct permissions for account in file_service_accounts: secret_name = f"{self._denormalize_account_email(account['account_id'])}-key" if secret_name not in managed_secrets: # Skip accounts that don't have managed secrets continue authorized_users = [user["email"] for user in account["authorized_users"]] actual_users = [self._denormalize_username(user) for user in self._get_all_secret_authorized_users(secret_name)] # Sort both lists for proper comparison authorized_users.sort() actual_users.sort() if authorized_users != actual_users: msg = f"Managed secret '{account['account_id']}' does not have the correct permissions. Expected: {authorized_users}, Actual: {actual_users}" compliance_issues.append(msg) self.logger.warning(msg) return compliance_issues def create_announcement(self, recipient: str) -> None: """ Creates an announcement about compliance issues using the SendingClient. Args: recipient (str): The email address of the announcement recipient. """ if not self.sending_client: raise ValueError("SendingClient is required for creating announcements") diff = self.check_compliance() if not diff: self.logger.info("No compliance issues found, no announcement will be created.") return title = f"Account Keys Compliance Issue Detected" body = f"Account keys for project {self.project_id} are not compliant with the defined policies on {self.service_account_keys_file}\n\n" for issue in diff: body += f"- {issue}\n" announcement = f"Dear team,\n\nThis is an automated notification about compliance issues detected in the Account Keys policy for project {self.project_id}.\n\n" announcement += f"We found {len(diff)} compliance issue(s) that need your attention.\n" announcement += f"\nPlease check the GitHub issue for detailed information and take appropriate action to resolve these compliance violations." self.sending_client.create_announcement(title, body, recipient, announcement) def print_announcement(self, recipient: str) -> None: """ Prints announcement details instead of sending them (for testing purposes). Args: recipient (str): The email address of the announcement recipient. """ if not self.sending_client: raise ValueError("SendingClient is required for printing announcements") diff = self.check_compliance() if not diff: self.logger.info("No compliance issues found, no announcement will be printed.") return title = f"Account Keys Compliance Issue Detected" body = f"Account keys for project {self.project_id} are not compliant with the defined policies on {self.service_account_keys_file}\n\n" for issue in diff: body += f"- {issue}\n" announcement = f"Dear team,\n\nThis is an automated notification about compliance issues detected in the Account Keys policy for project {self.project_id}.\n\n" announcement += f"We found {len(diff)} compliance issue(s) that need your attention.\n" announcement += f"\nPlease check the GitHub issue for detailed information and take appropriate action to resolve these compliance violations." self.sending_client.print_announcement(title, body, recipient, announcement) def generate_compliance(self) -> None: """ Modifies the service account keys file to match the current state of service accounts and secrets. It will just add the non managed service accounts. """ service_account_data = self._read_service_account_keys() file_service_accounts = service_account_data.get("service_accounts", []) # Ensure file_service_accounts is a list if file_service_accounts is None: file_service_accounts = [] self.logger.info(f"Found {len(file_service_accounts)} existing service accounts in the keys file") # Check that all service accounts that exist are declared, if not, add them for service_account in self._get_all_live_service_accounts(): if self._denormalize_account_email(service_account) not in [account["account_id"] for account in file_service_accounts]: self.logger.info(f"Service account '{service_account}' is not declared in the service account keys file, adding it") file_service_accounts.append({ "account_id": self._denormalize_account_email(service_account), "display_name": service_account, "authorized_users": [] }) managed_secrets = self._get_all_live_managed_secrets() extracted_secrets = [f"{self._denormalize_account_email(account['account_id'])}-key" for account in file_service_accounts] # Check for managed secrets that are not declared, if not, add them for secret in managed_secrets: if secret not in extracted_secrets: self.logger.info(f"Managed secret '{secret}' is not declared in the service account keys file, adding it") file_service_accounts.append({ "account_id": secret.strip("-key"), "display_name": self._normalize_account_email(secret.strip("-key")), "authorized_users": [] }) # Check for each managed secret if it has the correct permissions for account in file_service_accounts: secret_name = f"{self._denormalize_account_email(account['account_id'])}-key" if secret_name not in managed_secrets: continue authorized_users = sorted([user["email"] for user in account["authorized_users"]]) if not authorized_users: self.logger.info(f"Managed secret '{account}' is new, skipping permission check") continue actual_users_normalized = sorted(self._get_all_secret_authorized_users(secret_name)) actual_users = sorted([self._denormalize_username(user) for user in actual_users_normalized]) if authorized_users != actual_users: self.logger.info(f"Managed secret '{account}' does not have the correct permissions, updating it") account["authorized_users"] = [{"email": user} for user in actual_users] # Remove duplicates based on account_id seen_accounts = set() deduplicated_accounts = [] for account in file_service_accounts: if account["account_id"] not in seen_accounts: seen_accounts.add(account["account_id"]) deduplicated_accounts.append(account) else: self.logger.info(f"Removing duplicate entry for account '{account['account_id']}'") self._to_yaml_file(deduplicated_accounts, self.service_account_keys_file, header_info="Service Account Keys") def config_process() -> Dict[str, str]: with open(CONFIG_FILE, "r") as file: config = yaml.safe_load(file) if not config: raise ValueError("Configuration file is empty or invalid.") config_res = dict() config_res["project_id"] = config.get("project_id", "apache-beam-testing") config_res["logging_level"] = config.get("logging", {}).get("level", "INFO") config_res["logging_format"] = config.get("logging", {}).get("format", "[%(asctime)s] %(levelname)s: %(message)s") config_res["service_account_keys_file"] = config.get("service_account_keys_file", "../keys/keys.yaml") config_res["action"] = config.get("action", "check") # SendingClient configuration config_res["github_token"] = os.getenv("GITHUB_TOKEN", "") config_res["github_repo"] = os.getenv("GITHUB_REPOSITORY", "apache/beam") config_res["smtp_server"] = os.getenv("SMTP_SERVER", "") config_res["smtp_port"] = os.getenv("SMTP_PORT", 587) config_res["email"] = os.getenv("EMAIL_ADDRESS", "") config_res["password"] = os.getenv("EMAIL_PASSWORD", "") config_res["recipient"] = os.getenv("EMAIL_RECIPIENT", "") return config_res def main(): # Parse command line arguments parser = argparse.ArgumentParser(description="Account Keys Compliance Checker") parser.add_argument("--action", choices=["check", "announce", "print", "generate"], help="Action to perform: check compliance, create announcement, print announcement, or generate new compliance") args = parser.parse_args() config = config_process() # Command line argument takes precedence over config file action = args.action if args.action else config.get("action", "check") logging.basicConfig(level=getattr(logging, config["logging_level"].upper(), logging.INFO), format=config["logging_format"]) logger = logging.getLogger("AccountKeysPolicyComplianceCheck") # Create SendingClient if needed for announcement actions sending_client = None if action in ["announce", "print"]: try: # Provide default values for testing, especially for print action github_token = config["github_token"] or "dummy-token" github_repo = config["github_repo"] or "dummy/repo" smtp_server = config["smtp_server"] or "dummy-server" smtp_port = int(config["smtp_port"]) if config["smtp_port"] else 587 email = config["email"] or "dummy@example.com" password = config["password"] or "dummy-password" sending_client = SendingClient( logger=logger, github_token=github_token, github_repo=github_repo, smtp_server=smtp_server, smtp_port=smtp_port, email=email, password=password ) except Exception as e: logger.error(f"Failed to initialize SendingClient: {e}") return 1 logger.info(f"Starting Account Keys policy compliance check with action: {action}") account_keys_checker = AccountKeysPolicyComplianceCheck(config["project_id"], config["service_account_keys_file"], logger, sending_client) try: if action == "check": compliance_issues = account_keys_checker.check_compliance() if compliance_issues: logger.warning("Account Keys policy compliance issues found:") for issue in compliance_issues: logger.warning(issue) else: logger.info("Account Keys policy is compliant.") elif action == "announce": logger.info("Creating announcement for compliance violations...") recipient = config["recipient"] or "admin@example.com" account_keys_checker.create_announcement(recipient) elif action == "print": logger.info("Printing announcement for compliance violations...") recipient = config["recipient"] or "admin@example.com" account_keys_checker.print_announcement(recipient) elif action == "generate": logger.info("Generating new compliance based on current Account Keys policy...") account_keys_checker.generate_compliance() else: logger.error(f"Unknown action: {action}") return 1 except Exception as e: logger.error(f"Error executing action '{action}': {e}") return 1 return 0 if __name__ == "__main__": sys.exit(main())