helpers/base.py (244 lines of code) (raw):

# Copyright 2021 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import os import re import json from google.cloud.iam_credentials_v1 import IAMCredentialsClient from google.api_core.gapic_v1 import client_info as grpc_client_info import google_auth_httplib2 import google.auth from googleapiclient import http from google.cloud import resourcemanager_v3 import json_fix # noqa: F401 import tempfile PUBSUB2INBOX_VERSION = '1.8.2' TEMPORARY_DIRECTORY = None class NoCredentialsException(Exception): pass class Context(object): def __init__(self, eventId="", timestamp="", eventType="", resource=""): self.event_id = eventId self.timestamp = timestamp self.event_type = eventType self.resource = resource self.http_response = None def __json__(self): return "{\"event_id\": \"%s\", \"timestamp\": \"%s\", \"event_type\": \"%s\", \"resource\": \"%s\"}" % ( self.event_id, self.timestamp, self.event_type, self.resource, ) def __str__(self): return "{event_id: %s, timestamp: %s, event_type: %s, resource: %s}" % ( self.event_id, self.timestamp, self.event_type, self.resource, ) def get_user_agent(): return 'google-pso-tool/pubsub2inbox/%s' % (PUBSUB2INBOX_VERSION) def get_branded_http(credentials=None): if not credentials: credentials, project_id = google.auth.default( ['https://www.googleapis.com/auth/cloud-platform']) branded_http = google_auth_httplib2.AuthorizedHttp(credentials) branded_http = http.set_user_agent(branded_http, get_user_agent()) return branded_http def get_grpc_client_info(): client_info = grpc_client_info.ClientInfo(user_agent=get_user_agent()) return client_info class BaseHelper: logger = None def __init__(self, jinja_environment): self.project_number_cache = {} self.jinja_environment = jinja_environment self.logger = logging.getLogger('pubsub2inbox') def _init_tempdir(self): global TEMPORARY_DIRECTORY if not TEMPORARY_DIRECTORY: TEMPORARY_DIRECTORY = tempfile.TemporaryDirectory() self.logger.debug('Created temporary directory: %s' % (TEMPORARY_DIRECTORY.name)) os.chdir(TEMPORARY_DIRECTORY.name) def _clean_tempdir(self): global TEMPORARY_DIRECTORY if TEMPORARY_DIRECTORY: self.logger.debug('Cleaning temporary directory: %s' % (TEMPORARY_DIRECTORY.name)) TEMPORARY_DIRECTORY = None def _get_user_agent(self): return get_user_agent() def _get_branded_http(self, credentials=None): return get_branded_http(credentials) def _get_grpc_client_info(self): return get_grpc_client_info() def get_project_number(self, project_id, credentials=None): if project_id in self.project_number_cache: return self.project_number_cache[project_id] client = resourcemanager_v3.ProjectsClient(credentials=credentials) request = resourcemanager_v3.SearchProjectsRequest( query="projectId=%s" % (project_id),) response = client.search_projects(request=request) project = next(iter(response)) if project: self.project_number_cache[project_id] = int( project.name.replace("projects/", "")) return self.project_number_cache[project_id] return None def get_token_for_scopes(self, scopes, service_account=None): if not service_account: service_account = os.getenv('SERVICE_ACCOUNT') if not service_account: raise NoCredentialsException( 'You need to specify a service account for Directory API credentials, either through SERVICE_ACCOUNT environment variable or serviceAccountEmail parameter.' ) client = IAMCredentialsClient() name = 'projects/-/serviceAccounts/%s' % service_account response = client.generate_access_token(name=name, scope=scopes) return response.access_token def _jinja_expand_expr(self, contents, _tpl='config'): expr = self.jinja_environment.compile_expression(contents, undefined_to_none=True) return expr() def _jinja_expand_bool(self, contents, _tpl='config'): if isinstance(contents, bool): return contents var_template = self.jinja_environment.from_string(contents) var_template.name = _tpl val_str = var_template.render().lower() if val_str == 'true' or val_str == 't' or val_str == 'yes' or val_str == 'y' or val_str == '1': return True return False def _jinja_expand_bool_str(self, contents, _tpl='config'): if isinstance(contents, bool): return contents var_template = self.jinja_environment.from_string(contents) var_template.name = _tpl val_str = var_template.render().lower() if val_str == 'true' or val_str == 't' or val_str == 'yes' or val_str == 'y' or val_str == '1': return True if val_str == 'false' or val_str == 'f' or val_str == 'no' or val_str == 'n' or val_str == '0': return False return val_str def _jinja_expand_string(self, contents, _tpl='config'): var_template = self.jinja_environment.from_string(contents) var_template.name = _tpl val_str = var_template.render() return val_str def _jinja_expand_int(self, contents, _tpl='config'): if isinstance(contents, int): return contents if isinstance(contents, float): return int(contents) var_template = self.jinja_environment.from_string(contents) var_template.name = _tpl val_str = var_template.render() return int(val_str) def _jinja_expand_float(self, contents, _tpl='config'): if isinstance(contents, float): return contents var_template = self.jinja_environment.from_string(contents) var_template.name = _tpl val_str = var_template.render() return float(val_str) def _jinja_var_to_list(self, _var, _tpl='config'): if isinstance(_var, list): return _var else: var_template = self.jinja_environment.from_string(_var) var_template.name = _tpl val_str = var_template.render() try: return json.loads(val_str) except Exception: self.logger.debug( 'Error parsing variable to list, trying command or CR separated.', extra={ 'template': _var, 'value': val_str }) vals = list( filter( lambda x: x.strip() != "", re.split('[\n,]', val_str), )) return list(map(lambda x: x.strip(), vals)) def _jinja_var_to_list_all(self, _var, _tpl='config'): if isinstance(_var, list): return self._jinja_expand_list(_var, _tpl) else: var_template = self.jinja_environment.from_string(_var) var_template.name = _tpl val_str = var_template.render() try: return json.loads(val_str) except Exception: self.logger.debug( 'Error parsing variable to list, trying command or CR separated.', extra={ 'template': _var, 'value': val_str }) vals = list( filter( lambda x: x.strip() != "", re.split('[\n,]', val_str), )) return list(map(lambda x: x.strip(), vals)) def _jinja_expand_dict(self, _var, _tpl='config'): for k, v in _var.items(): if not isinstance(v, dict): if isinstance(v, str): _var[k] = self._jinja_expand_string(v) else: _var[k] = self._jinja_expand_dict(_var[k]) return _var def _jinja_expand_dict_all(self, _var, _tpl='config'): if not isinstance(_var, dict): return _var for k, v in _var.items(): if not isinstance(v, dict): if isinstance(v, str): _var[k] = self._jinja_expand_string(v) if isinstance(v, list): for idx, lv in enumerate(_var[k]): if isinstance(lv, dict): _var[k][idx] = self._jinja_expand_dict_all(lv) if isinstance(lv, str): _var[k][idx] = self._jinja_expand_string(lv) else: _var[k] = self._jinja_expand_dict_all(_var[k]) return _var def _jinja_expand_dict_all_expr(self, _var, _tpl='config'): _new_var = {} if not isinstance(_var, dict): return _var for k, v in _var.items(): if not isinstance(v, dict): if isinstance(v, str): if k.endswith('Expr'): _new_var[k[0:len(k) - 4]] = self._jinja_expand_expr(v) else: _new_var[k] = self._jinja_expand_string(v) if isinstance(v, int): _new_var[k] = self._jinja_expand_int(v) if isinstance(v, float): _new_var[k] = self._jinja_expand_float(v) if isinstance(v, list): _new_var[k] = [] for idx, lv in enumerate(_var[k]): if isinstance(lv, dict): _new_var[k].append( self._jinja_expand_dict_all_expr(lv)) if isinstance(lv, str): _new_var[k].append(self._jinja_expand_string(lv)) else: _new_var[k] = self._jinja_expand_dict_all_expr(_var[k]) return _new_var def _jinja_expand_list(self, _var, _tpl='config'): if not isinstance(_var, list): return _var for idx, v in enumerate(_var): if isinstance(v, str): _var[idx] = self._jinja_expand_string(v) return _var