dialogflow-cx/vpc-sc-demo/backend/update_blueprint.py (236 lines of code) (raw):
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module for updating configuration of assets."""
import base64
import json
import logging
import analytics_utilities as au
import flask
import requests
import status_utilities as su
import update_utilities as uu
DOMAIN = "webhook.internal"
update = flask.Blueprint("update", __name__)
logger = logging.getLogger(__name__)
@update.route("/update_webhook_access", methods=["POST"])
def update_webhook_access(): # pylint: disable=too-many-branches
"""Update webhook access to allow/disallow allUsers."""
data = su.get_token_and_project(flask.request)
if "response" in data:
return data["response"]
project_id, token = data["project_id"], data["token"]
region = flask.request.args["region"]
webhook_name = flask.request.args["webhook_name"]
content = flask.request.get_json(silent=True)
internal_only = content["status"]
headers = {}
headers["x-goog-user-project"] = project_id
headers["Authorization"] = f"Bearer {token}"
response = requests.get(
(
f"https://cloudfunctions.googleapis.com/v2/projects/{project_id}"
f"/locations/{region}/functions/{webhook_name}:getIamPolicy"
),
headers=headers,
timeout=10,
)
if response.status_code != 200:
logger.info(
" cloudfunctions API rejected getIamPolicy GET request: %s", response.text
)
return flask.Response(
status=response.status_code, response=json.dumps({"error": response.text})
)
policy_dict = response.json()
all_users_is_invoker_member = False
for binding in policy_dict.get("bindings", []):
for member in binding["members"]:
if (
member == "allUsers"
and binding["role"] == "roles/cloudfunctions.invoker"
):
all_users_is_invoker_member = True
if (not internal_only and all_users_is_invoker_member) or (
(internal_only) and (not all_users_is_invoker_member)
):
# internal_only matches request; no change needed
return flask.Response(status=200)
if internal_only:
for binding in policy_dict.get("bindings", []):
for member in binding["members"]:
if binding["role"] == "roles/cloudfunctions.invoker":
binding["members"] = [
member for member in binding["members"] if member != "allUsers"
]
else:
if "bindings" not in policy_dict or len(policy_dict["bindings"]) == 0:
policy_dict["bindings"] = [
{"role": "roles/cloudfunctions.invoker", "members": []}
]
invoker_role_exists = None
for binding in policy_dict["bindings"]:
if binding["role"] == "roles/cloudfunctions.invoker":
invoker_role_exists = True
binding["members"].append("allUsers")
if not invoker_role_exists:
policy_dict["bindings"].append(
{"role": "roles/cloudfunctions.invoker", "members": ["allUsers"]}
)
response = requests.post(
(
f"https://cloudfunctions.googleapis.com/v1/projects/{project_id}"
f"/locations/{region}/functions/{webhook_name}:setIamPolicy"
),
headers=headers,
json={"policy": policy_dict},
timeout=10,
)
if response.status_code != 200:
logger.info(
" cloudfunctions API rejected setIamPolicy POST request: %s", response.text
)
return flask.Response(
status=response.status_code, response=json.dumps({"error": response.text})
)
response = flask.Response(status=200)
return au.register_action(
flask.request, response, au.ACTIONS.UPDATE_STATUS, {"service": "ingress"}
)
@update.route("/update_webhook_ingress", methods=["POST"])
def update_webhook_ingress():
"""Update webhook ingress to allow/disallow external connections."""
data = su.get_token_and_project(flask.request)
if "response" in data:
return data["response"]
project_id, token = data["project_id"], data["token"]
region = flask.request.args["region"]
webhook_name = flask.request.args["webhook_name"]
content = flask.request.get_json(silent=True)
headers = {}
headers["Content-type"] = "application/json"
headers["x-goog-user-project"] = project_id
headers["Authorization"] = f"Bearer {token}"
response = requests.get(
(
f"https://cloudfunctions.googleapis.com/v1/projects/{project_id}"
f"/locations/{region}/functions/{webhook_name}"
),
headers=headers,
timeout=10,
)
if response.status_code != 200:
logger.info(" cloudfunctions API rejected GET request: %s", response.text)
return flask.Response(
status=response.status_code, response=json.dumps({"error": response.text})
)
webhook_data = response.json()
ingress_settings = "ALLOW_INTERNAL_ONLY" if content["status"] else "ALLOW_ALL"
if webhook_data["ingressSettings"] == ingress_settings:
return flask.Response(status=200)
webhook_data["ingressSettings"] = ingress_settings
response = requests.patch(
(
f"https://cloudfunctions.googleapis.com/v1/projects/{project_id}"
f"/locations/{region}/functions/{webhook_name}"
),
headers=headers,
json=webhook_data,
timeout=10,
)
if response.status_code != 200:
logger.info(" cloudfunctions API rejected PATCH request: %s", response.text)
return flask.Response(
status=response.status_code, response=json.dumps({"error": response.text})
)
response = flask.Response(status=200)
return au.register_action(
flask.request, response, au.ACTIONS.UPDATE_STATUS, {"service": "ingress"}
)
@update.route("/update_security_perimeter_cloudfunctions", methods=["POST"])
def update_security_perimeter_cloudfunctions():
"""Update security perimeter, cloudfunctions."""
response = uu.update_security_perimeter(
flask.request, "cloudfunctions.googleapis.com"
)
return au.register_action(
flask.request, response, au.ACTIONS.UPDATE_STATUS, {"service": "ingress"}
)
@update.route("/update_security_perimeter_dialogflow", methods=["POST"])
def update_security_perimeter_dialogflow():
"""Update security perimeter, dialogflow."""
response = uu.update_security_perimeter(flask.request, "dialogflow.googleapis.com")
return au.register_action(
flask.request, response, au.ACTIONS.UPDATE_STATUS, {"service": "ingress"}
)
@update.route("/update_service_directory_webhook_fulfillment", methods=["POST"])
def update_service_directory_webhook_fulfillment(): # pylint: disable=too-many-return-statements,too-many-locals
"""Update agent webhook; toggle between service directory and generic webhook."""
data = su.get_token_and_project(flask.request)
if "response" in data:
return data["response"]
project_id, token = data["project_id"], data["token"]
untrusted_region = flask.request.args["region"]
if untrusted_region in ["us-central1"]:
region = untrusted_region
else:
return flask.Response(
status=200,
response=json.dumps({"status": "BLOCKED", "reason": "UNKNOWN_REGION"}),
)
content = flask.request.get_json(silent=True)
if content["status"] is True:
fulfillment = "service-directory"
else:
fulfillment = "generic-web-service"
bucket = flask.request.args["bucket"]
webhook_name = flask.request.args["webhook_name"]
webhook_trigger_uri = (
f"https://{region}-{project_id}.cloudfunctions.net/{webhook_name}"
)
result = su.get_agents(token, project_id, region)
if "response" in result:
return result["response"]
agent_name = result["data"]["Telecommunications"]["name"]
result = su.get_webhooks(token, agent_name, project_id, region)
if "response" in result:
return result["response"]
webhook_dict = result["data"]["cxPrebuiltAgentsTelecom"]
webhook_name = webhook_dict["name"]
if fulfillment == "generic-web-service":
data = {
"displayName": "cxPrebuiltAgentsTelecom",
"genericWebService": {"uri": webhook_trigger_uri},
}
elif fulfillment == "service-directory":
def encode(msg_bytes):
return base64.b64encode(msg_bytes).decode("ascii")
data = {
"displayName": "cxPrebuiltAgentsTelecom",
"serviceDirectory": {
"service": (
f"projects/{project_id}/locations/{region}/"
f"namespaces/df-namespace"
f"/services/df-service"
),
"genericWebService": {
"uri": f"https://{DOMAIN}",
"allowedCaCerts": [encode(uu.get_cert(token, project_id, bucket))],
},
},
}
else:
return flask.Response( # pragma: no cover
status=500, response=f"Unexpected setting for fulfillment: {fulfillment}"
)
headers = {}
headers["x-goog-user-project"] = project_id
headers["Authorization"] = f"Bearer {token}"
response = requests.patch(
f"https://{region}-dialogflow.googleapis.com/v3/{webhook_name}",
headers=headers,
json=data,
timeout=10,
)
if response.status_code != 200:
logger.info(
" dialogflow API unexpectedly rejected invocation PATCH request: %s",
response.text,
)
return flask.Response(
status=response.status_code, response=json.dumps({"error": response.text})
)
response = flask.Response(status=200)
return au.register_action(
flask.request, response, au.ACTIONS.UPDATE_STATUS, {"service": "ingress"}
)