terraform-modules/cloud-functions/src/provide-secrets-permissions/main.py (104 lines of code) (raw):

def update_permissions(event, context): import sys import os """Background Cloud Function to be triggered by Cloud Storage. Args: event (dict): The dictionary with data specific to this type of event. The `data` field contains a description of the event in the Cloud Storage `object` format described here: https://cloud.google.com/storage/docs/json_api/v1/objects#resource context (google.cloud.functions.Context): Metadata of triggering event. Returns: None; the function reads the service accounts from blob and grant them IAM roles """ sa_list = [] validated_sa_list = [] roles = ['roles/secretmanager.secretAccessor','roles/secretmanager.viewer'] project_id = os.environ['GCP_PROJECT'] print('Printing the payload.\n') print('EVENT:' , event) print('Event ID:' , context.event_id) print('Event type:', context.event_type) print('Bucket:', event['bucket']) print('File:', event['name']) print('Metageneration:', event['metageneration']) print('Created:', event['timeCreated']) print('Updated:', event['updated']) bucket_name = event['bucket'] blob_name = event['name'] print('Fetching Service Accounts from the file',bucket_name + '/' + blob_name) try: sa_list = fetch_sa_from_file(bucket_name,blob_name) except Exception as e: print('Unable to fetch Service Accounts from ' + bucket_name + '/' + blob_name + '.') exception_handler('fetch_sa_from_file', str(e)) print('Fetching policy of the project.\n') try: policy = get_policy(project_id) print('The policy is \n', policy) except Exception as e: print('Unable to fetch policy of the project ' + project_id) exception_handler('get_policy', str(e)) for sa in sa_list: for role in roles: try: print('Adding role ' + role + ' to the member ' + sa) policy = generate_modified_policy(policy,role,sa) except Exception as e: print('Unable to create modified policy') exception_handler('generate_modified_policy', str(e)) print('Setting the generated policy so the Service Accounts get the required roles.\n') try: policy = set_policy(project_id,policy) print('Printing new policy', policy) except Exception as e: print('Unable to set the policy \n.') exception_handler('set_policy', str(e)) def fetch_sa_from_file(bucket_name, blob_name): """Function to fetch the Service Accounts written in the text GCS object. Args: bucket_name (string): GCS bucket name. blob_name (string): Object containing the service accounts. Returns: sa_list (list); list containing service accounts """ from google.cloud import storage storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) blob = bucket.blob(blob_name) sa_list = [] with blob.open("r") as f: sa_list.extend(f.read().split()) return(sa_list) def get_policy(project_id): """Function to fetch the policy of the project. Args: project_id (string); project id whose policy is to be fetched. Returns: policy (dict); returns IAM policy. """ import os from google.oauth2 import service_account import googleapiclient.discovery service = googleapiclient.discovery.build( "cloudresourcemanager", "v1" ) policy = ( service.projects() .getIamPolicy( resource=project_id, body={"options": {"requestedPolicyVersion": 1}}, ) .execute() ) return(policy) def generate_modified_policy(policy, role, member): """Function to generate the new policy of the project. Args: policy (string); IAM policy of the project. role (string); IAM role that needs to be added to the policy for the given member. member (string); Service Account which needs to be added for the roles in the policy. Returns: policy (dict); new IAM policy of the project. """ import os from google.oauth2 import service_account import googleapiclient.discovery role_binding_exists = 0 #If role binding exists, add a member for b in policy['bindings']: if b["role"] == role: b["members"].append('serviceAccount:' + member) role_binding_exists = 1 break #If role binding doesnt exists, add one if role_binding_exists == 0: binding = {"role": role, "members": ['serviceAccount:' + member]} policy["bindings"].append(binding) print(policy) return policy def set_policy(project_id,policy): """Function to fetch the policy of the project. Args: project_id(string) policy (dict); generated IAM policy of the project that needs to be set. Returns: new IAM policy. """ import os from google.oauth2 import service_account import googleapiclient.discovery service = googleapiclient.discovery.build( "cloudresourcemanager", "v1" ) policy = ( service.projects() .setIamPolicy(resource=project_id, body={"policy": policy}) .execute() ) return policy def exception_handler(function,message): """Function to handle exceptions and exit. Args: function (string); name of the function message(string); error message Returns: NA """ import sys print('Function ' + function + ' failed with the following error: ' + message) print('Exiting with status code 1..') sys.exit(1)