infra/iam/migrate_roles.py (196 lines of code) (raw):

# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # This script is used to export the IAM policy of a Google Cloud project to a YAML format. # It retrieves the IAM policy bindings, parses the members, and formats the output in a structured # YAML format, excluding service accounts and groups. The output includes usernames, emails, and # their associated permissions, with optional conditions for roles that have conditions attached. # You need to have the Google Cloud SDK installed and authenticated to run this script. import argparse import os import sys import yaml import roles.generate_roles as generate_roles from generate import export_project_iam, to_yaml_file from google.cloud.iam_admin_v1 import GetRoleRequest, IAMClient def migrate_permissions(data: list) -> list: """ Migrates permissions from the permissions to the new roles defined on beam_roles/ directory. The rules are: - If the user has owner role, leave it as is, remove any other role as it is redundant. - If the user has any admin or secret related role, it will be migrated to the beam_admin role. - If the user has an editor role or any user role but not an admin or secret related role, it will be migrated to the beam_infra_manager role. - If the user has a role that is not only viewer, it will be migrated to the beam_committer role. - The users with just viewer roles will be migrated to the beam_viewer role. The rules are in a hierarchical order, meaning that if a user has a high role, it will also have the lower roles. Args: data: A list of dictionaries containing user permissions and details. Returns: A list of dictionaries with migrated permissions. """ migrated_data = [] for item in data: username = item["username"] email = item["email"] permissions = item["permissions"] # Initialize the new roles new_roles = { "beam_owner": False, "beam_admin": False, "beam_infra_manager": False, "beam_committer": False, "beam_viewer": False } for permission in permissions: role = permission["role"] # If the role is 'roles/owner', it is considered an owner role. if role == "roles/owner": new_roles["beam_owner"] = True # If it ends with 'admin' or containes 'secretmanager' in the role, it is considered an admin role. Case insensitive. elif 'admin' in role.lower() or 'secretmanager' in role.lower(): new_roles["beam_admin"] = True new_roles["beam_infra_manager"] = True new_roles["beam_committer"] = True new_roles["beam_viewer"] = True # If it is an editor role, it will be migrated to the beam_infra_manager. elif role == "roles/editor": new_roles["beam_infra_manager"] = True new_roles["beam_committer"] = True new_roles["beam_viewer"] = True elif role != "roles/viewer": # If it is a role that is not only viewer, it will be migrated to the beam_committer role. new_roles["beam_committer"] = True new_roles["beam_viewer"] = True # If it is a viewer role, it will be migrated to the beam_viewer role. else: new_roles["beam_viewer"] = True # Create the migrated entry migrated_entry = { "username": username, "email": email, "permissions": [] } if new_roles["beam_owner"]: migrated_entry["permissions"].append({"role": "roles/owner"}) else: if new_roles["beam_admin"]: migrated_entry["permissions"].append({"role": "projects/PROJECT-ID/roles/beam_admin"}) if new_roles["beam_infra_manager"]: migrated_entry["permissions"].append({"role": "projects/PROJECT-ID/roles/beam_infra_manager"}) if new_roles["beam_committer"]: migrated_entry["permissions"].append({"role": "projects/PROJECT-ID/roles/beam_committer"}) if new_roles["beam_viewer"]: migrated_entry["permissions"].append({"role": "projects/PROJECT-ID/roles/beam_viewer"}) migrated_data.append(migrated_entry) return migrated_data def get_gcp_role_permissions(role_id: str) -> list: """ Retrieves the permissions associated to a google cloud role. Args: project_id: The ID of the Google Cloud project. role_id: The name of the role to retrieve permissions for. Returns: A list of permissions associated with the specified role. """ client = IAMClient() request = GetRoleRequest(name=role_id) role = client.get_role(request=request) return list(role.included_permissions) def get_roles_from_file(file_path: str) -> list: """ Reads a YAML file containing roles and returns a list of dictionaries with user data. Args: file_path: The path to the YAML file containing roles. Returns: A list of dictionaries with user data. """ with open(file_path, 'r') as file: data = yaml.safe_load(file) roles = [] for role in data: email = role.get("email") username = role.get("username") permissions = role.get("permissions", []) roles.append({ "email": email, "username": username, "permissions": permissions }) return roles def permission_differences(project_id: str, user_email: str) -> list: """ Generates a list of differences between the original and migrated permissions for a user. It gets the permission from the generated files, so it is expected that the files are already generated and up to date. Args: project_id: The ID of the Google Cloud project. user_email: The email of the user to compare permissions for. Returns: A list of dictionaries containing the differences in permissions for the specified user. """ cache = {} user_differences = {} original = get_roles_from_file(f"{project_id}.original-roles.yaml") migrated = get_roles_from_file(f"{project_id}.migrated-roles.yaml") # Get the permissions on the beam_roles beam_roles = generate_roles.get_roles() for role_name, role_data in beam_roles.items(): permissions = role_data["permissions"] cache[role_name] = permissions # Get the permissions for the original roles for user in original: username = user["username"] email = user["email"] # Skip if the user email does not match the specified user_email if user_email and email != user_email: continue original_roles = user["permissions"] original_permissions = [] for role in original_roles: if '_withcond_' in role['role']: # Skip roles with conditions, as they are not supported in the new roles continue if 'organizations/' in role['role']: # Skip organization roles, as they are not supported in the new roles continue if role['role'] not in cache: permissions = get_gcp_role_permissions(role["role"]) cache[role['role']] = sorted(permissions) original_permissions.extend(cache[role['role']]) # Initialize the user differences entry user_differences[username] = { "email": email, "original_roles": original_roles, "original_permissions": sorted(original_permissions), "migrated_roles": [], "migrated_permissions": [], "differences": [] } # Get the permissions for the migrated roles for user in migrated: username = user["username"] email = user["email"] # Skip if the user email does not match the specified user_email if user_email and email != user_email: continue migrated_roles = user["permissions"] migrated_permissions = [] for role in migrated_roles: full_role_name = role["role"] # Owner is a special case, it should not be migrated to any other role. if "roles/owner" in full_role_name: migrated_permissions.extend(get_gcp_role_permissions(full_role_name)) else: role_name = full_role_name.split('roles/')[1] migrated_permissions.extend(cache[role_name]) user_differences[username]["migrated_roles"] = migrated_roles user_differences[username]["migrated_permissions"] = sorted(migrated_permissions) # Compare original and migrated permissions differences_list = [] for username, user_data in user_differences.items(): original_permissions = user_data["original_permissions"] migrated_permissions = user_data["migrated_permissions"] # Find differences in permissions original_set = set(original_permissions) migrated_set = set(migrated_permissions) added_permissions = migrated_set.difference(original_set) removed_permissions = original_set.difference(migrated_set) if added_permissions or removed_permissions: differences = { "username": username, "email": user_data["email"], "added_permissions": sorted(list(added_permissions)), "removed_permissions": sorted(list(removed_permissions)) } differences_list.append(differences) return differences_list def main(): """ Main function to run the script. This function parses command-line arguments to either export IAM policies or generate permission differences for a specified GCP project. """ parser = argparse.ArgumentParser( description="Export IAM policies or generate permission differences for a GCP project." ) parser.add_argument( "project_id", help="The Google Cloud project ID." ) parser.add_argument( "--difference", dest="user_email", metavar="USER_EMAIL", help="Generate permission differences for the specified user email." ) args = parser.parse_args() project_id = args.project_id user_email = args.user_email if user_email: # If the iam policy has not been generated yet, it will generate the original IAM policy first. if not os.path.exists(f"{project_id}.original-roles.yaml") or not os.path.exists(f"{project_id}.migrated-roles.yaml"): print(f"Original IAM policy for project {project_id} not found. Generating original and migrated roles first.") print(f"Exporting IAM policy for project {project_id}...") iam_data = export_project_iam(project_id) original_filename = f"{project_id}.original-roles.yaml" original_header = f"Exported original IAM policy for project {project_id}" to_yaml_file(iam_data, original_filename, header_info=original_header) print("Migrating permissions to new roles...") migrated_data = migrate_permissions(iam_data) migrated_filename = f"{project_id}.migrated-roles.yaml" migrated_header = f"Migrated IAM policy for project {project_id} to new beam_roles" to_yaml_file(migrated_data, migrated_filename, header_info=migrated_header) print(f"Generated {original_filename} and {migrated_filename}") print(f"Generating permission differences for {user_email} in project {project_id}...") differences = permission_differences(project_id, user_email) if differences: output_filename = f"{project_id}.permission-differences.yaml" header = f"Permission differences for user {user_email} in project {project_id}" to_yaml_file(differences, output_filename, header_info=header) print(f"Generated {output_filename}") else: print(f"No permission differences found for user {user_email} in project {project_id}.") else: print(f"Exporting IAM policy for project {project_id}...") iam_data = export_project_iam(project_id) original_filename = f"{project_id}.original-roles.yaml" original_header = f"Exported original IAM policy for project {project_id}" to_yaml_file(iam_data, original_filename, header_info=original_header) print("Migrating permissions to new roles...") migrated_data = migrate_permissions(iam_data) migrated_filename = f"{project_id}.migrated-roles.yaml" migrated_header = f"Migrated IAM policy for project {project_id} to new beam_roles" to_yaml_file(migrated_data, migrated_filename, header_info=migrated_header) print(f"Generated {original_filename} and {migrated_filename}") print(f"To generate permission differences, run: python {sys.argv[0]} {project_id} --difference <user_email>") if __name__ == "__main__": main()