dialogflow-cx/vpc-sc-demo/backend/status_blueprint.py (152 lines of code) (raw):

# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Blueprint for checking project status.""" import json import logging import flask import requests import status_utilities as su status = flask.Blueprint("status", __name__) logger = logging.getLogger(__name__) @status.route("/restricted_services_status_cloudfunctions", methods=["GET"]) def restricted_services_status_cloudfunctions(): """Get boolean status of wether cloudfunctions is restricted.""" return su.get_restricted_service_status(flask.request, "cloudfunctions_restricted") @status.route("/restricted_services_status_dialogflow", methods=["GET"]) def restricted_services_status_dialogflow(): """Get boolean status of wether dialogflow is restricted.""" return su.get_restricted_service_status(flask.request, "dialogflow_restricted") @status.route("/webhook_ingress_internal_only_status", methods=["GET"]) def webhook_ingress_internal_only_status(): """Get boolean status of internally restricted webhook ingress.""" data = su.get_token_and_project(flask.request) if "response" in data: return data["response"] project_id, token = data["project_id"], data["token"] region = flask.request.args["region"] webhook_name = flask.request.args["webhook_name"] response = su.check_function_exists(token, project_id, region, webhook_name) if "response" in response: return response["response"] headers = {} headers["x-goog-user-project"] = project_id headers["Authorization"] = f"Bearer {token}" result = requests.get( ( "https://cloudfunctions.googleapis.com/v1/projects/" f"{project_id}/locations/{region}/functions/{webhook_name}" ), headers=headers, timeout=10, ) if result.status_code != 200: logger.info(" cloudfunctions API rejected request: %s", result.text) return flask.abort(result.status_code) result_dict = result.json() if result_dict["ingressSettings"] in [ "ALLOW_INTERNAL_ONLY", "ALLOW_INTERNAL_AND_GCLB", ]: return flask.Response(status=200, response=json.dumps({"status": True})) return flask.Response(status=200, response=json.dumps({"status": False})) @status.route("/webhook_access_allow_unauthenticated_status", methods=["GET"]) def webhook_access_allow_unauthenticated_status(): # pylint: disable=too-many-branches,too-many-return-statements """Get boolean status of allow unauthenticated webhook access.""" data = su.get_token_and_project(flask.request) if "response" in data: return data["response"] project_id, token = data["project_id"], data["token"] region = flask.request.args["region"] webhook_name = flask.request.args["webhook_name"] response = su.check_function_exists(token, project_id, region, webhook_name) if "response" in response: return response["response"] headers = {} headers["x-goog-user-project"] = project_id headers["Authorization"] = f"Bearer {token}" result = requests.get( ( "https://cloudfunctions.googleapis.com/v2/" f"projects/{project_id}/locations/{region}/" f"functions/{webhook_name}:getIamPolicy" ), headers=headers, timeout=10, ) if result.status_code == 403: if (result.json()["error"]["status"] == "PERMISSION_DENIED") and ( result.json()["error"]["message"].startswith( "Permission 'cloudfunctions.functions.getIamPolicy' denied" ) ): return flask.Response( status=200, response=json.dumps( {"status": "BLOCKED", "reason": "PERMISSION_DENIED"} ), ) if (result.json()["error"]["status"] == "PERMISSION_DENIED") and ( result.json()["error"]["message"].startswith( "Cloud Functions API has not been used in project" ) ): return flask.Response( status=200, response=json.dumps( {"status": "BLOCKED", "reason": "CLOUDFUNCTIONS_API_DISABLED"} ), ) for details in result.json()["error"]["details"]: for violation in details["violations"]: if violation["type"] == "VPC_SERVICE_CONTROLS": return flask.Response( status=200, response=json.dumps( {"status": "BLOCKED", "reason": "VPC_SERVICE_CONTROLS"} ), ) return flask.Response(status=500, response=result.text) if result.status_code != 200: logger.info(" cloudfunctions API rejected request: %s", result.text) return flask.abort(result.status_code) policy_dict = result.json() all_users_is_invoker_member = False for binding in policy_dict.get("bindings", []): for member in binding.get("members", []): if ( member == "allUsers" and binding["role"] == "roles/cloudfunctions.invoker" ): all_users_is_invoker_member = True logger.info(" all_users_is_invoker_member: %s", all_users_is_invoker_member) if all_users_is_invoker_member: return flask.Response(status=200, response=json.dumps({"status": False})) return flask.Response(status=200, response=json.dumps({"status": True})) @status.route("/service_directory_webhook_fulfillment_status", methods=["GET"]) def service_directory_webhook_fulfillment_status(): """Get boolean status of service directory usage in webhook.""" data = su.get_token_and_project(flask.request) if "response" in data: return data["response"] project_id, token = data["project_id"], data["token"] untrusted_region = flask.request.args["region"] if untrusted_region in ["us-central1"]: region = untrusted_region else: return flask.Response( status=200, response=json.dumps({"status": "BLOCKED", "reason": "UNKNOWN_REGION"}), ) result = su.get_agents(token, project_id, region) if "response" in result: return result["response"] if "Telecommunications" not in result["data"]: return flask.Response( status=200, response=json.dumps({"status": "BLOCKED", "reason": "AGENT_NOT_FOUND"}), ) agent_name = result["data"]["Telecommunications"]["name"] result = su.get_webhooks(token, agent_name, project_id, region) if "response" in result: response = result["response"] else: webhook_dict = result["data"]["cxPrebuiltAgentsTelecom"] if "serviceDirectory" in webhook_dict: response = flask.Response(status=200, response=json.dumps({"status": True})) else: response = flask.Response( status=200, response=json.dumps({"status": False}) ) return response