dialogflow-cx/vpc-sc-demo/backend/asset_utilities.py (299 lines of code) (raw):

# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # pylint: disable=inconsistent-return-statements """Utility Module to set and remove terraform assets.""" import json import logging import os import flask import google.auth.transport.requests import requests import status_utilities as su from flask import Response from google.oauth2 import service_account from invoke import task logger = logging.getLogger(__name__) RESOURCE_GROUP = { "module.service_directory": { "module.service_directory.google_service_directory_endpoint.reverse_proxy", "module.service_directory.google_service_directory_namespace.reverse_proxy", "module.service_directory.google_service_directory_service.reverse_proxy", ( "module.service_perimeter.google_access_context_manager_service_perimeter" ".service_perimeter[0]" ), }, "module.services": { "module.services.google_project_service.appengine", "google_project_service.artifactregistry", "module.services.google_project_service.run", "module.services.google_project_service.vpcaccess", "google_project_service.accesscontextmanager", "google_project_service.cloudbilling", "google_project_service.cloudbuild", "google_project_service.cloudfunctions", "google_project_service.compute", "google_project_service.dialogflow", "google_project_service.iam", "google_project_service.servicedirectory", }, "module.vpc_network": { "module.vpc_network.google_artifact_registry_repository.webhook_registry", "module.vpc_network.google_cloudbuild_trigger.reverse_proxy_server", "module.vpc_network.google_compute_address.reverse_proxy_address", "module.vpc_network.google_compute_firewall.allow", "module.vpc_network.google_compute_firewall.allow_dialogflow", "module.vpc_network.google_compute_instance.reverse_proxy_server", "module.vpc_network.google_compute_network.vpc_network", "module.vpc_network.google_compute_router.nat_router", "module.vpc_network.google_compute_router_nat.nat_manual", "module.vpc_network.google_compute_subnetwork.reverse_proxy_subnetwork", "module.vpc_network.google_project_iam_member.dfsa_sd_pscAuthorizedService", "module.vpc_network.google_project_iam_member.dfsa_sd_viewer", "module.vpc_network.google_project_service_identity.dfsa", "module.vpc_network.google_pubsub_topic.reverse_proxy_server_build", "module.vpc_network.google_storage_bucket_object.proxy_server_source", }, "module.webhook_agent": { "module.webhook_agent.google_cloudfunctions_function.webhook", "google_storage_bucket.bucket", "module.webhook_agent.google_storage_bucket_object.webhook", "module.webhook_agent.google_dialogflow_cx_agent.full_agent", }, "all": { "module.webhook_agent", "module.vpc_network", "module.services", "module.service_directory", }, } def get_credentials(): """Helper function to get service account credentials.""" return service_account.Credentials.from_service_account_file( "/backend/demo-server-key.json", scopes=["https://www.googleapis.com/auth/cloud-platform"], ) def get_access_policy_title(token, access_policy_id): """Get access_policy_title using the accesscontextmanager API.""" headers = {} headers["Authorization"] = f"Bearer {token}" result = requests.get( f"https://accesscontextmanager.googleapis.com/v1/accessPolicies/{access_policy_id}", headers=headers, timeout=10, ) if result.status_code != 200: # Should this return the status from the result.status_code? return {"response": flask.Response(status=500, response=result.text)} return {"access_policy_title": result.json()["title"]} def get_terraform_env(access_token, request_args, debug=False): """Build an dictionary of environment variables for terraform run.""" env = {} env["GOOGLE_OAUTH_ACCESS_TOKEN"] = access_token env["TF_VAR_project_id"] = request_args["project_id"] env["TF_VAR_bucket"] = request_args["bucket"] env["TF_VAR_region"] = request_args["region"] if request_args.get("access_policy_title", None): response = su.get_access_policy_name( access_token, request_args["access_policy_title"], request_args["project_id"], error_code=500, ) if "response" in response: return response env["TF_VAR_access_policy_name"] = response["access_policy_name"] else: env["TF_VAR_access_policy_name"] = "null" if debug: env["TF_LOG"] = "DEBUG" return env @task def tf_init(context, module, workdir, env, prefix): """Initialize terraform.""" user_access_token = env.pop("GOOGLE_OAUTH_ACCESS_TOKEN") debug = "TF_LOG" in env credentials = get_credentials() request = google.auth.transport.requests.Request() credentials.refresh(request) env["GOOGLE_OAUTH_ACCESS_TOKEN"] = credentials.token promise = context.run( ( f"cp {module} {workdir} && " f"terraform -chdir={workdir} init " "-upgrade -reconfigure " f'-backend-config="access_token={env["GOOGLE_OAUTH_ACCESS_TOKEN"]}" ' f'-backend-config="bucket={os.environ["TF_PLAN_STORAGE_BUCKET"]}" ' f'-backend-config="prefix={prefix}"' ), warn=True, hide=True, asynchronous=True, ) result = promise.join() env["GOOGLE_OAUTH_ACCESS_TOKEN"] = user_access_token if debug: logging.debug(result.exited) logging.debug(result.stdout) logging.debug(result.stderr) if result.exited: return Response( status=500, response=json.dumps( { "status": "ERROR", "stdout": result.stdout, "stderr": result.stderr, } ), ) @task def tf_plan(context, module, workdir, env, target=None): """Create terraform plan.""" debug = "TF_LOG" in env target_option = f"-target={target}" if target else "" json_option = "-json" if not debug else "" promise = context.run( ( f"cp {module} {workdir} && " f'terraform -chdir="{workdir}" plan {json_option} ' "-refresh-only -var " f'access_token=\'{env["GOOGLE_OAUTH_ACCESS_TOKEN"]}\' {target_option}' ), warn=True, hide=True, asynchronous=True, env=env, ) result = promise.join() if debug: logging.debug(result.exited) logging.debug(result.stdout) logging.debug(result.stderr) else: errors = [] hooks = { "refresh_start": [], "refresh_complete": [], "apply_complete": [], "apply_start": [], } lines = result.stdout.split("\n") for line in lines: if line.strip(): try: message = json.loads(line.strip()) if message["@level"] == "error": errors.append(message) if "hook" in message: hooks[message["type"]].append(message["hook"]) except KeyError: logging.debug("COULD NOT LOAD: %s", line) if errors: return { "response": Response( status=500, response=json.dumps( { "status": "ERROR", "errors": errors, } ), ) } return {"hooks": hooks} @task def tf_apply( # pylint: disable=too-many-arguments,too-many-locals context, module, workdir, env, destroy, target=None, verbose=False, ): """Apply terraform plan.""" debug = "TF_LOG" in env target_option = f"-target={target}" if target else "" json_option = "-json" if not debug else "" destroy_option = "--destroy" if destroy else "" verbose_option = 'export TF_LOG="DEBUG" &&' if verbose else "" promise = context.run( f"cp {module} {workdir} &&" f"{verbose_option}" f'terraform -chdir="{workdir}" apply -lock-timeout=10s {json_option}' f' --auto-approve -var access_token=\'{env["GOOGLE_OAUTH_ACCESS_TOKEN"]}\' ' f"{destroy_option} {target_option}", warn=True, hide=None, asynchronous=True, env=env, ) result = promise.join() if debug: logging.debug(result.exited) logging.debug(result.stdout) logging.debug(result.stderr) else: errors = [] lines = result.stdout.split("\n") for line in lines: if line.strip(): try: message = json.loads(line) if message["@level"] == "error": errors.append(message) except json.decoder.JSONDecodeError: logging.debug("COULD NOT LOAD: %s", line) if errors: # Should return dict to match the other function... return Response( status=500, response=json.dumps( { "status": "ERROR", "errors": errors, } ), ) @task def tf_state_list(context, module, workdir, env): """Get list of all states.""" debug = "TF_LOG" in env promise = context.run( f'\ cp {module} {workdir} &&\ terraform -chdir="{workdir}" state list', warn=True, hide=True, asynchronous=True, env=env, ) result = promise.join() if debug: logging.debug(result.exited) logging.debug(result.stdout) logging.debug(result.stderr) if result.exited: return { "response": Response( status=500, response=json.dumps( { "status": "ERROR", "stdout": result.stdout, "stderr": result.stderr, } ), ) } status_dict = {"resources": result.stdout.split()} for group_name, group_resources in RESOURCE_GROUP.items(): if group_resources.issubset(set(status_dict["resources"])): status_dict["resources"].append(group_name) logging.debug(status_dict["resources"]) return status_dict def get_debug(request): """Get boolean to engage debug mode for terraform""" if (request.args.get("debug") == "true") or (logging.DEBUG >= logging.root.level): return True return False def validate_project_id(project_id, access_token): """Confirm if the current project_id is valid for current user.""" headers = {} headers["Authorization"] = f"Bearer {access_token}" response = requests.get( f"https://cloudresourcemanager.googleapis.com/v1/projects/{project_id}", headers=headers, timeout=10, ) if response.status_code != 200: return flask.Response( status=500, response=json.dumps({"status": "BLOCKED", "reason": "UNKNOWN_PROJECT_ID"}), )