access.py (135 lines of code) (raw):

# Copyright 2023-2024 Google, LLC. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import base64, json, configparser, requests import google.oauth2.service_account import google.oauth2.credentials # user credentials from oauth2client.client import GoogleCredentials from googleapiclient import discovery import TagEngineStoreHandler as tesh from common import log_info, log_error config = configparser.ConfigParser() config.read("tagengine.ini") TAG_CREATOR_SA = config['DEFAULT']['TAG_CREATOR_SA'].strip() SCOPES = ['openid', 'https://www.googleapis.com/auth/cloud-platform', 'https://www.googleapis.com/auth/userinfo.email'] ##################### Methods used by API only ################# # get the service account intended to process request def get_requested_service_account(json): store = tesh.TagEngineStoreHandler() if isinstance(json, dict) and 'service_account' in json: service_account = json['service_account'] elif isinstance(json, dict) and 'config_uuid' in json and 'config_type' in json: service_account = store.lookup_service_account(json['config_type'], json['config_uuid']) elif isinstance(json, dict) and 'job_uuid' in json: config_uuid, config_type = store.read_config_by_job(json['job_uuid']) if config_uuid != None and config_type != None: service_account = store.lookup_service_account(config_type, config_uuid) else: service_account = None else: service_account = TAG_CREATOR_SA return service_account def check_user_credentials_from_api(tag_creator_sa, tag_invoker_account): print('enter check_user_credentials_from_api') has_permission = False credentials = GoogleCredentials.get_application_default() service = discovery.build('iam', 'v1', credentials=credentials) start_index = tag_creator_sa.index('@') + 1 end_index = tag_creator_sa.index('.') project = tag_creator_sa[start_index:end_index] resource = 'projects/{}/serviceAccounts/{}'.format(project, tag_creator_sa) try: request = service.projects().serviceAccounts().getIamPolicy(resource=resource) iam_policy = request.execute() if "bindings" not in iam_policy: return has_permission for binding in iam_policy.get('bindings'): if binding.get('role') == 'roles/iam.serviceAccountUser': if f"user:{tag_invoker_account}" in binding.get('members') or f"serviceAccount:{tag_invoker_account}" in binding.get('members'): has_permission = True except Exception as e: msg = 'Error calling getIamPolicy on: {}'.format(resource) log_error(msg, e) return has_permission def do_authentication(headers, json_request, ENABLE_AUTH): status = True response = None tag_invoker_account = get_tag_invoker_account(headers.get('Authorization')) tag_creator_sa = get_requested_service_account(json_request) if tag_creator_sa == None and 'config_uuid' in json_request: status = False response = { "status": "error", "message": "Fatal error: Invalid config_uuid in the json request." } return status, response, tag_creator_sa if tag_creator_sa == None and 'job_uuid' in json_request: status = False response = { "status": "error", "message": "Fatal error: Invalid job_uuid in the json request." } return status, response, tag_creator_sa if ENABLE_AUTH == False: status = True response = None return status, response, tag_creator_sa has_permission = check_user_credentials_from_api(tag_creator_sa, tag_invoker_account) print('user has permission:', has_permission) if has_permission == False: status = False response = { "status": "error", "message": "Fatal error: User " + tag_invoker_account + " is missing roles/iam.serviceAccountUser on " + tag_creator_sa, } return status, response, tag_creator_sa return status, response, tag_creator_sa # get the account (user or service) which invoked the job # this account is then passed to the tag history function to record the tag creation / update event def get_tag_invoker_account(raw_token): token = raw_token.replace("Bearer ", "").replace("bearer ", "").replace(" ","") if '.' in token: token = token.split('.')[1] padded_token = token + "="*divmod(len(token),4)[1] raw_decode = base64.b64decode(padded_token) txt_decode = raw_decode.decode("utf-8") token_obj = json.loads(txt_decode) tag_invoker_account = token_obj['email'] return tag_invoker_account # *************************************************************** # ################ Methods used by both API + UI paths ############## # *************************************************************** # # used when TAG_CREATOR_SA credentials are needed def get_target_credentials(target_service_account): from google.auth import impersonated_credentials # service account credentials source_credentials, _ = google.auth.default() try: target_credentials = impersonated_credentials.Credentials(source_credentials=source_credentials, target_principal=target_service_account, target_scopes=SCOPES, lifetime=3600) # lifetime is in seconds -> 60 minutes per request success = True except Exception as e: print('Error impersonating credentials: ', e) success = False return target_credentials, success ##################### Methods used by UI path only ################# def check_user_credentials_from_ui(credentials_dict, service_account): has_permission = False credentials = dict_to_credentials(credentials_dict) service = discovery.build('iam', 'v1', credentials=credentials) # get the GCP project which owns the service account start_index = service_account.index('@') + 1 end_index = service_account.index('.') service_account_project = service_account[start_index:end_index] resource = 'projects/{}/serviceAccounts/{}'.format(service_account_project, service_account) permissions = ["iam.serviceAccounts.actAs", "iam.serviceAccounts.get"] # permissions associated with roles/iam.serviceAccountUser body={"permissions": permissions} request = service.projects().serviceAccounts().testIamPermissions(resource=resource, body=body) response = request.execute() if 'permissions' in response: #print('allowed permissions:', response['permissions']) user_permissions = response['permissions'] if "iam.serviceAccounts.actAs" in user_permissions and "iam.serviceAccounts.get" in user_permissions: has_permission = True return has_permission def credentials_to_dict(credentials): return {'token': credentials.token, 'refresh_token': credentials.refresh_token, 'token_uri': credentials.token_uri, 'client_id': credentials.client_id, 'client_secret': credentials.client_secret, 'scopes': credentials.scopes, 'id_token': credentials.id_token} def dict_to_credentials(_dict): credentials = google.oauth2.credentials.Credentials(token=_dict['token'], refresh_token=_dict['refresh_token'], token_uri=_dict['token_uri'], client_id=_dict['client_id'], client_secret=_dict['client_secret'], scopes=_dict['scopes'], id_token=_dict['id_token']) return credentials