tools/custom-role-manager/main.py (311 lines of code) (raw):

#!/usr/bin/env python3 # Copyright 2021 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import re import sys import yaml import argparse import logging import fnmatch import googleapiclient.discovery from googleapiclient import http from pythonjsonlogger import jsonlogger import google_auth_httplib2 import google.auth NAME = 'custom-role-manager' TERRAFORM_TEMPLATES = { 'pre': '', 'organization': '''resource "google_organization_iam_custom_role" "{terraform_id}" {{ role_id = "{role_id}" org_id = "{organization_id}" title = "{role_title}" description = "{role_description}" permissions = {role_permissions} }} ''', 'project': '''resource "google_project_iam_custom_role" "{terraform_id}" {{ role_id = "{role_id}" title = "{role_title}" description = "{role_description}" permissions = {role_permissions} }} ''', 'post': '' } TERRAFORM_PRE = False TERRAFORM_RESOURCES = {} def process_permission(logger, role, permissions_to_grant, permission_name): permission_candidate = None if 'include' in role: for include_permission in role['include']: permission_candidate = None if include_permission.startswith( '/') and include_permission.endswith('/'): # Regexp if re.match(include_permission[1:len(include_permission) - 1], permission_name): permission_candidate = permission_name break else: if fnmatch.fnmatch(permission_name, include_permission): permission_candidate = permission_name break if permission_candidate: if 'exclude' in role: for exclude_permission in role['exclude']: if exclude_permission.startswith( '/') and exclude_permission.endswith('/'): # Regexp if re.match( exclude_permission[1:len(exclude_permission) - 1], permission_candidate): permission_candidate = None break else: if fnmatch.fnmatch(permission_candidate, exclude_permission): permission_candidate = None break if permission_candidate: permissions_to_grant.append(permission_candidate) else: permission_candidate = permission_name return permissions_to_grant def process_role(logger, service, role, output_terraform=False): global TERRAFORM_PRE, TERRAFORM_TEMPLATES, TERRAFORM_RESOURCES if 'source' not in role: logger.error('Source not defined for role.', extra={'role': role['id']}) sys.exit(2) role_exists = True role_name = '%s/roles/%s' % (role['parent'], role['id']) if role['parent'].startswith('organizations/'): role_request = service.organizations().roles().get(name=role_name) else: role_request = service.projects().roles().get(name=role_name) try: role_response = role_request.execute() except googleapiclient.errors.HttpError as e: if e.resp.status == 404 or e.resp.status == 400: role_exists = False else: raise e if not isinstance(role['source'], list): sources = [role['source']] else: sources = role['source'] permissions_to_grant = [] for source in sources: if source.startswith('roles/'): source_request = service.roles().get(name=source) source_response = source_request.execute() for p in source_response['includedPermissions']: permissions_to_grant = process_permission( logger, role, permissions_to_grant, p) if source.startswith('//'): next_page_token = None while True: permissions = service.permissions().queryTestablePermissions( body={ 'fullResourceName': source, 'pageToken': next_page_token }).execute() for p in permissions['permissions']: if ('stage' not in p or p['stage'] != 'DEPRECATED') and ( 'customRolesSupportLevel' not in p or p['customRolesSupportLevel'] != 'NOT_SUPPORTED'): permissions_to_grant = process_permission( logger, role, permissions_to_grant, p['name']) else: if 'stage' in p and p['stage'] == 'DEPRECATED': logger.info('Permission %s is deprecated.' % (p['name']), extra={ 'permission': p['name'], }) if 'customRolesSupportLevel' in p and p[ 'customRolesSupportLevel'] == 'NOT_SUPPORTED': logger.info( 'Permission %s is not supported in custom roles.' % (p['name']), extra={ 'permission': p['name'], }) if 'nextPageToken' in permissions: next_page_token = permissions['nextPageToken'] else: break if 'append' in role: for permission in role['append']: permissions_to_grant.append(permission) logger.info('%d permissions determined for role %s.' % (len(permissions_to_grant), role['id']), extra={ 'role': role['id'], 'permissions': permissions_to_grant }) if output_terraform: if not TERRAFORM_PRE: print(TERRAFORM_TEMPLATES['pre']) TERRAFORM_PRE = True organization_id = '' project_id = '' terraform_id = role['tfId'] if 'tfId' in role else role['id'] tf_template = 'project' if 'organizations/' in role['parent']: organization_id = role['parent'].replace('organizations/', '') tf_template = 'organization' TERRAFORM_RESOURCES[ terraform_id] = 'google_organization_iam_custom_role.%s' % ( terraform_id) else: project_id = role['parent'].replace('projects/', '') TERRAFORM_RESOURCES[ terraform_id] = 'google_project_iam_custom_role.%s' % ( terraform_id) role_id = role['id'] role_title = role['title'] if 'title' in role else '' role_description = role['description'] if 'description' in role else '' role_permissions = str(permissions_to_grant).replace('\'', '"') print(TERRAFORM_TEMPLATES[tf_template].format( role_id=role_id, terraform_id=terraform_id, organization_id=organization_id, project_id=project_id, role_title=role_title.replace('"', '\\"'), role_description=role_description.replace('"', '\\"'), role_permissions=role_permissions)) elif not role_exists: logger.info('Creating role: %s' % (role['id']), extra={ 'role': role['id'], }) create_role_request_body = { 'roleId': role['id'], 'role': { 'title': role['title'], 'description': role['description'], 'includedPermissions': permissions_to_grant, 'stage': role['stage'], } } if role['parent'].startswith('organizations/'): role_create_request = service.organizations().roles().create( parent=role['parent'], body=create_role_request_body) else: role_create_request = service.projects().roles().create( parent=role['parent'], body=create_role_request_body) role_create_response = role_create_request.execute() logger.warning('Role created: %s' % (role['id']), extra={ 'role': role['id'], 'role_name': role_create_response['name'], 'etag': role_create_response['etag'] }) elif not output_terraform: if 'includedPermissions' not in role_response: role_response['includedPermissions'] = [] added_permissions = set(permissions_to_grant) - set( role_response['includedPermissions']) removed_permissions = set( role_response['includedPermissions']) - set(permissions_to_grant) if len(added_permissions) > 0 or len(removed_permissions) > 0: logger.info('Permissions changed for role: %s' % (role['id']), extra={ 'role': role['id'], 'role_name': role_response['name'], 'added_permissions': list(added_permissions), 'removed_permissions': list(removed_permissions), 'etag': role_response['etag'] }) patch_role_request_body = { 'name': role_response['name'], 'title': role['title'], 'description': role['description'], 'includedPermissions': permissions_to_grant, 'stage': role['stage'], 'etag': role_response['etag'] } if role['parent'].startswith('organizations/'): role_patch_request = service.organizations().roles().patch( name=role_response['name'], body=patch_role_request_body) else: role_patch_request = service.projects().roles().patch( name=role_response['name'], body=patch_role_request_body) role_patch_response = role_patch_request.execute() logger.warning('Role updated: %s' % (role['id']), extra={ 'role': role['id'], 'role_name': role_patch_response['name'], 'added_permissions': list(added_permissions), 'removed_permissions': list(removed_permissions), 'etag': role_patch_response['etag'] }) else: logger.info('Permissions unchanged for role: %s' % (role['id']), extra={ 'role': role['id'], 'role_name': role_response['name'], 'etag': role_response['etag'] }) def setup_logging(): logger = logging.getLogger(NAME) if os.getenv('LOG_LEVEL'): logger.setLevel(int(os.getenv('LOG_LEVEL'))) else: logger.setLevel(logging.INFO) json_handler = logging.StreamHandler() formatter = jsonlogger.JsonFormatter() json_handler.setFormatter(formatter) logger.addHandler(json_handler) return logger logger = setup_logging() def process_pubsub(event, context): logger.info('%s starting to process Pub/Sub message.' % (NAME)) with open('config.yaml') as f: config = yaml.load(f, Loader=yaml.SafeLoader) if 'roles' not in config: logger.error('Roles are not defined in the configuration!') sys.exit(1) for role in config['roles']: process_role(logger, service, role) if __name__ == '__main__': arg_parser = argparse.ArgumentParser( description= 'Create custom roles by filtering existing permissions or roles') arg_parser.add_argument('--config', type=str, help='Configuration file', default='config.yaml') arg_parser.add_argument( '--terraform', action='store_true', help='Output a Terraform compatible custom role definition instead', default=False) args = arg_parser.parse_args() credentials, project_id = google.auth.default( ['https://www.googleapis.com/auth/cloud-platform']) branded_http = google_auth_httplib2.AuthorizedHttp(credentials) branded_http = http.set_user_agent( branded_http, 'google-pso-tool/custom-role-manager/1.0.0') service = googleapiclient.discovery.build('iam', 'v1', http=branded_http) with open(args.config) as f: config = yaml.load(f, Loader=yaml.SafeLoader) if 'roles' not in config: logger.error('Roles are not defined in the configuration!') sys.exit(1) if 'terraform' in config: for k, v in config['terraform'].items(): TERRAFORM_TEMPLATES[k] = v for role in config['roles']: process_role(logger, service, role, args.terraform) if TERRAFORM_PRE: resources = '' for k, v in TERRAFORM_RESOURCES.items(): if resources == '': resources = '{' else: resources += ', ' resources += '"%s" = %s' % (k, v) if resources != '': resources += '}' print(TERRAFORM_TEMPLATES['post'].format(resources=resources))