tools/iam-permissions-copier/iam.py (219 lines of code) (raw):

# Copyright 2022 Google LLC # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # http://www.apache.org/licenses/LICENSE-2.0 # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import click import json import re import csv import time import google.auth import googleapiclient.discovery try: from constants import ( ALL_RESOURCES_IN_PROCESSING_ORDER, MATCHER_EXPRESSION, FORMAT_MATCHER, ) except: pass from table_logger import TableLogger from inventory import cai log_headers = [ "resource_type", "mapping_method", "resource", "role", "member_to_copy", "member_copy_result", ] table_output = TableLogger( columns=",".join(log_headers), colwidth={ "resource_type": 20, "mapping_method": 20, "resource": 20, "role": 30, }, default_colwidth=50, ) def look_for_gcloud_org(): click.secho( "No --org-id provided. Trying gcloud config...", ) try: creds, project_id = google.auth.default() client = googleapiclient.discovery.build( serviceName="cloudresourcemanager", version="v1", cache_discovery=False, ) req = client.projects().getAncestry(projectId=project_id) resp = req.execute() find_org = next( filter( lambda r: r["resourceId"]["type"] == "organization", resp["ancestor"], ) ) org_id = find_org["resourceId"]["id"] click.secho( "Using {id} from gcloud config.".format(id=org_id), ) return org_id except: raise SystemExit( ( 'Could not determine org id from gcloud config. Please either ' 'pass in the org id via --org-id or ensure your current gcloud ' 'configuration has an active project set.' ) ) def parse_csv(file): with open(file, mode="r") as inp: reader = csv.reader(inp) next(reader, None) return {rows[0]: rows[1] for rows in reader} @click.group() def cli(): pass @cli.command() @click.option("--org-id") def generate_inventory_file(org_id): org_id_to_use = org_id if org_id else look_for_gcloud_org() cai.fetch_cai_file(org_id_to_use) def should_keep_fix(member, manual_map, existing_bindings): match = re.match(MATCHER_EXPRESSION, member) new_member = None mapping_type = None if match: new_member = FORMAT_MATCHER(match) mapping_type = "Dynamic" strip_user = member.replace("user:", "") if strip_user in manual_map: new_member = "user:{email}".format(email=manual_map[strip_user]) mapping_type = "Manual" if new_member in existing_bindings: return None if new_member and mapping_type: return [new_member, mapping_type] return None def execute_iam_copy(resources, dry_run, verify_permissions): timestamp = int(time.time()) filename = "out-{timestamp}.csv".format(timestamp=timestamp) f = open(filename, "a+", encoding="UTF8", newline="") writer = csv.writer(f) writer.writerow(log_headers) for (instance, bindings) in resources: for binding in bindings: iam_migrator = instance( binding["resource"], binding["role"], binding["new_member"], dry_run, ) if verify_permissions: iam_migrator.verify_permissions() iam_migrator.migrate() writer.writerow( [ binding["type"], binding["mapping_type"], binding["resource"], binding["role"], binding["old_member"], binding["new_member"], ] ) click.secho( "Script Complete. {filename} created with output.".format( filename=filename ), ) @cli.command() @click.option( "--filename", default=None, ) @click.option("--dry-run", default=False) @click.option("--map-file", default=None) @click.option("--org-id", envvar="ORG_ID") @click.option("--verify-permissions", default=True) def run(filename, dry_run, map_file, org_id, verify_permissions): org_id = org_id if org_id else look_for_gcloud_org() if not map_file: click.secho( ( 'Notice: No manual mapper provided. To provide one ' 'set the --map-file parameter.\n' ), fg="yellow", ) if not filename: click.secho( ( 'Notice: No filename provided. To provide one set the ' '--filename parameter. Fetching inventory file...\n' ), fg="yellow", ) manual_map = parse_csv(map_file) if map_file else {} assets = [] asset_types = [] file_to_open = filename if filename else cai.fetch_cai_file(org_id) f = open(file_to_open) cai_data = json.load(f) for resource in ALL_RESOURCES_IN_PROCESSING_ORDER: filter_resources = list( filter( lambda r: resource.ASSET_TYPE == r["assetType"], cai_data, ) ) click.secho( "Processing {count} resources of type {type}...".format( count=len(filter_resources), type=resource.ASSET_TYPE ), fg="blue", ) new_assets = [] for res in filter_resources: for binding in res["policy"]["bindings"]: for member in binding["members"]: should_fix_member = should_keep_fix( member, manual_map, binding["members"] ) if should_fix_member is not None: asset = { "type": resource.ASSET_TYPE.split( "googleapis.com/" )[1], "mapping_type": should_fix_member[1], "resource": res["resource"], "role": binding["role"], "old_member": member, "new_member": should_fix_member[0], } new_assets.append(asset) assets.extend(new_assets) # storing assets with the coresponding resource class to process later on if len(new_assets) > 0: asset_types.append((resource, new_assets)) click.secho( "Found {count} tainted iam permissions on resource {type}... \n".format( count=len(new_assets), type=resource.ASSET_TYPE ), fg="yellow", ) click.secho( "{count} total permissions to be copied".format(count=len(assets)), fg="green", bg="black", ) for a in assets: table_output(*a.values()) if dry_run: click.secho( "RUNNING AS DRY RUN. NO ACTUAL PERMISSIONS WILL BE TOUCHED.", fg="black", bg="green", ) else: click.secho( ( '\n\nThis operation will copy the tainted iam permissions. ' 'There is no reversal operation. \n' ), fg="red", ) if click.confirm("Are you sure you want to execute?"): execute_iam_copy(asset_types, dry_run, verify_permissions) if __name__ == "__main__": cli()