dialogflow-cx/vpc-sc-demo/backend/status_utilities.py (437 lines of code) (raw):
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utility Module to get status of project assets."""
import json
import logging
import flask
import get_token
import requests
logger = logging.getLogger(__name__)
def get_project_number(token, project_id):
"""Get project number using cloudresourcemanager API."""
headers = {}
headers["Content-type"] = "application/json"
headers["Authorization"] = f"Bearer {token}"
result = requests.get(
f"https://cloudresourcemanager.googleapis.com/v1/projects/{project_id}",
headers=headers,
timeout=10,
)
if "projectNumber" in result.json():
return {"project_number": result.json()["projectNumber"]}
return {
"response": flask.Response(
status=200,
response=json.dumps({"status": "BLOCKED", "reason": "NO_PROJECT"}),
)
}
def get_access_policy_name(token, access_policy_title, project_id, error_code=200):
"""Get access policy name using cloudresourcemanager API."""
if not access_policy_title:
return {
"response": flask.Response(
status=error_code,
response=json.dumps(
{"status": "BLOCKED", "reason": "NO_ACCESS_POLICY"}
),
)
}
headers = {}
headers["Content-type"] = "application/json"
headers["Authorization"] = f"Bearer {token}"
response = requests.post(
f"https://cloudresourcemanager.googleapis.com/v1/projects/{project_id}:getAncestry",
headers=headers,
timeout=10,
)
if response.status_code != 200:
return {
"response": flask.Response(
status=error_code,
response=json.dumps({"status": "BLOCKED", "reason": "UNKNOWN_STATUS"}),
)
}
organization_id = None
for ancestor_dict in response.json().get("ancestor", []):
if ancestor_dict["resourceId"]["type"] == "organization":
organization_id = ancestor_dict["resourceId"]["id"]
if not organization_id:
return {
"response": flask.Response(
status=error_code,
response=json.dumps({"status": "BLOCKED", "reason": "NO_ORGANIZATION"}),
)
}
response = get_project_number(token, project_id)
if "response" in response:
return response
project_number = response["project_number"]
headers = {}
headers["Content-type"] = "application/json"
headers["Authorization"] = f"Bearer {token}"
response = requests.get(
(
"https://accesscontextmanager.googleapis.com/v1/"
f"accessPolicies?parent=organizations/{organization_id}"
),
headers=headers,
timeout=10,
)
for policy in response.json().get("accessPolicies", []):
if policy["title"] == access_policy_title:
if f"projects/{project_number}" in policy["scopes"]:
return {"access_policy_name": policy["name"]}
return {
"response": flask.Response(
status=error_code,
response=json.dumps({"status": "BLOCKED", "reason": "POLICY_NOT_FOUND"}),
)
}
def get_service_perimeter_data_uri(
token,
project_id,
access_policy_name,
perimeter_title="df_webhook",
):
"""Get uri for for service perimeter."""
access_policy_id = access_policy_name.split("/")[1]
headers = {}
headers["x-goog-user-project"] = project_id
headers["Authorization"] = f"Bearer {token}"
response = requests.get(
(
f"https://accesscontextmanager.googleapis.com/v1/"
f"accessPolicies/{access_policy_id}/servicePerimeters"
),
headers=headers,
timeout=10,
)
if response.status_code != 200:
if (response.json()["error"]["status"] == "PERMISSION_DENIED") and (
response.json()["error"]["message"].startswith(
"Access Context Manager API has not been used in project"
)
):
response = flask.Response(
status=200,
response=json.dumps(
{
"status": "BLOCKED",
"reason": "ACCESS_CONTEXT_MANAGER_API_DISABLED",
}
),
)
return {"response": response}
if response.json()["error"]["status"] == "PERMISSION_DENIED":
response = flask.Response(
status=200,
response=json.dumps(
{"status": "BLOCKED", "reason": "PERMISSION_DENIED"}
),
)
return {"response": response}
logger.info(" accesscontextmanager API rejected request: %s", response.text)
return {"response": flask.Response(status=500, response=response.text)}
for service_perimeter_dict in response.json().get("servicePerimeters", []):
if service_perimeter_dict["title"] == perimeter_title:
return {
"uri": (
"https://accesscontextmanager.googleapis.com/v1/"
f'{service_perimeter_dict["name"]}'
)
}
return {
"response": flask.Response(
status=200,
response=json.dumps({"status": "BLOCKED", "reason": "PERIMETER_NOT_FOUND"}),
)
}
def get_service_perimeter_status(token, project_id, access_policy_name):
"""Get service perimeter status using accesscontextmanager API."""
headers = {}
headers["x-goog-user-project"] = project_id
headers["Authorization"] = f"Bearer {token}"
response = get_service_perimeter_data_uri(token, project_id, access_policy_name)
if "response" in response:
return response
service_perimeter_data_uri = response["uri"]
result = requests.get(service_perimeter_data_uri, headers=headers, timeout=10)
if result.status_code != 200:
logger.info(" accesscontextmanager API rejected request: %s", result.text)
if (result.json()["error"]["status"] == "PERMISSION_DENIED") and (
result.json()["error"]["message"].startswith(
"Access Context Manager API has not been used in project"
)
):
# Potential bug: should return a dict?
return flask.Response(
status=200,
response=json.dumps(
{
"status": "BLOCKED",
"reason": "ACCESS_CONTEXT_MANAGER_API_DISABLED",
}
),
)
if result.json()["error"]["status"] == "PERMISSION_DENIED":
response = flask.Response(
status=200,
response=json.dumps(
{"status": "BLOCKED", "reason": "PERMISSION_DENIED"}
),
)
return {"response": response}
response = flask.Response(status=result.status_code, response=result.text)
return {"response": response}
return result.json()
def get_restricted_services_status(token, project_id, access_policy_name):
"""Check which services are restricted using accesscontextmanager API."""
service_perimeter_status = get_service_perimeter_status(
token, project_id, access_policy_name
)
if "response" in service_perimeter_status:
return service_perimeter_status
status_dict = {}
if "restrictedServices" not in service_perimeter_status["status"]:
status_dict["cloudfunctions_restricted"] = False
status_dict["dialogflow_restricted"] = False
else:
status_dict["cloudfunctions_restricted"] = (
"cloudfunctions.googleapis.com"
in service_perimeter_status["status"]["restrictedServices"]
)
status_dict["dialogflow_restricted"] = (
"dialogflow.googleapis.com"
in service_perimeter_status["status"]["restrictedServices"]
)
return status_dict
def check_function_exists(token, project_id, region, function_name):
"""Check if function exists using cloudfunctions api."""
headers = {}
headers["x-goog-user-project"] = project_id
headers["Authorization"] = f"Bearer {token}"
result = requests.get(
(
f"https://cloudfunctions.googleapis.com/v1/"
f"projects/{project_id}/locations/{region}/functions/{function_name}"
),
headers=headers,
timeout=10,
)
if result.status_code == 200:
response = {"status": "OK"}
elif result.status_code == 404 and result.json()["error"]["status"] == "NOT_FOUND":
response = {
"response": flask.Response(
status=200,
response=json.dumps(
{"status": "BLOCKED", "reason": "WEBHOOK_NOT_FOUND"}
),
)
}
elif result.status_code == 403 and result.json()["error"]["message"].startswith(
"Cloud Functions API has not been used in project"
):
response = {
"response": flask.Response(
status=200,
response=json.dumps(
{"status": "BLOCKED", "reason": "CLOUDFUNCTIONS_API_DISABLED"}
),
)
}
elif result.status_code == 403:
if (result.json()["error"]["status"] == "PERMISSION_DENIED") and (
result.json()["error"]["message"].startswith(
"Permission 'cloudfunctions.functions.get' denied on resource"
)
):
response = {
"response": flask.Response(
status=200,
response=json.dumps(
{"status": "BLOCKED", "reason": "PERMISSION_DENIED"}
),
)
}
else:
response = None
for details in result.json()["error"]["details"]:
for violation in details["violations"]:
if violation["type"] == "VPC_SERVICE_CONTROLS":
response = {
"response": flask.Response(
status=200,
response=json.dumps(
{
"status": "BLOCKED",
"reason": "VPC_SERVICE_CONTROLS",
}
),
)
}
if response is None:
response = {
"response": flask.Response(status=500, response=result.text)
}
else:
response = {
"response": flask.Response(status=500, response=json.dumps(result.json()))
}
return response
def get_agents(token, project_id, region): # pylint: disable=too-many-branches
"""Get agents using dialogflow API"""
headers = {}
headers["x-goog-user-project"] = project_id
headers["Authorization"] = f"Bearer {token}"
if region not in ["us-central1"]:
return {
"response": flask.Response(
status=200,
response=json.dumps({"status": "BLOCKED", "reason": "UNKNOWN_REGION"}),
)
}
result = requests.get(
(
f"https://{region}-dialogflow.googleapis.com/v3/"
f"projects/{project_id}/locations/{region}/agents"
),
headers=headers,
timeout=10,
)
if result.status_code == 403:
if (result.json()["error"]["status"] == "PERMISSION_DENIED") and (
result.json()["error"]["message"].startswith(
"Dialogflow API has not been used in project"
)
):
response = {
"response": flask.Response(
status=200,
response=json.dumps(
{"status": "BLOCKED", "reason": "DIALOGFLOW_API_DISABLED"}
),
)
}
elif (result.json()["error"]["status"] == "PERMISSION_DENIED") and (
result.json()["error"]["message"].startswith(
"Caller does not have required permission"
)
):
response = {
"response": flask.Response(
status=200,
response=json.dumps(
{"status": "BLOCKED", "reason": "WRONG_PERMISSION"}
),
)
}
elif "details" in result.json()["error"]:
response = None
for details in result.json()["error"]["details"]:
for violation in details["violations"]:
if violation["type"] == "VPC_SERVICE_CONTROLS":
response = {
"response": flask.Response(
status=200,
response=json.dumps(
{
"status": "BLOCKED",
"reason": "VPC_SERVICE_CONTROLS",
}
),
)
}
if response is None:
response = {
"response": flask.Response(
status=200,
response=json.dumps(
{"status": "BLOCKED", "reason": "PERMISSION_DENIED"}
),
)
}
elif result.status_code != 200:
logger.info(" dialogflow API rejected request: %s", result.text)
response = {
"response": flask.Response(
status=result.status_code, response=json.dumps({"error": result.text})
)
}
else:
result_dict = result.json()
if len(result_dict) == 0:
response = {
"response": flask.Response(
status=200,
response=json.dumps(
{"status": "BLOCKED", "reason": "AGENT_NOT_FOUND"}
),
)
}
elif "error" in result_dict:
logger.info(" get_agents error: %s", result.text)
# Seems like a potential bug; returning a dict? Also error resulting from 200 code?
response = None
else:
response = {
"data": {data["displayName"]: data for data in result_dict["agents"]}
}
return response
def get_webhooks(token, agent_name, project_id, region):
"""Get webhooks using dialogflow API."""
headers = {}
headers["x-goog-user-project"] = project_id
headers["Authorization"] = f"Bearer {token}"
result = requests.get(
f"https://{region}-dialogflow.googleapis.com/v3/{agent_name}/webhooks",
headers=headers,
timeout=10,
)
if result.status_code == 403:
for details in result.json()["error"]["details"]:
for violation in details["violations"]:
if violation["type"] == "VPC_SERVICE_CONTROLS":
response = flask.Response(
status=200,
response=json.dumps(
{"status": "BLOCKED", "reason": "VPC_SERVICE_CONTROLS"}
),
)
return {"response": response}
if result.status_code != 200:
logger.info(" dialogflow API rejected request: %s", result.text)
response = flask.Response(
status=result.status_code, response=json.dumps({"error": result.text})
)
return {"response": response}
agents = result.json()
if "webhooks" not in agents:
return {
"response": flask.Response(
status=200,
response=json.dumps(
{"status": "BLOCKED", "reason": "WEBHOOK_NOT_FOUND"}
),
)
}
return {"data": {data["displayName"]: data for data in agents["webhooks"]}}
def get_token_and_project(request):
"""Helper method to retrieve a token or project, or return early."""
response = {}
token_dict = get_token.get_token(request, token_type="access_token")
if "response" in token_dict:
return token_dict
response["token"] = token_dict["access_token"]
response["project_id"] = request.args.get("project_id", None)
if not response["project_id"]:
return {
"response": flask.Response(
status=200,
response=json.dumps({"status": "BLOCKED", "reason": "NO_PROJECT_ID"}),
)
}
return response
def get_restricted_service_status(request, service_key):
"""Get status of restricted service:"""
data = get_token_and_project(request)
if "response" in data:
return data["response"]
project_id, token = data["project_id"], data["token"]
access_policy_title = request.args.get("access_policy_title", None)
response = get_access_policy_name(token, access_policy_title, project_id)
if "response" in response:
return response["response"]
access_policy_name = response["access_policy_name"]
status_dict = get_restricted_services_status(token, project_id, access_policy_name)
if "response" in status_dict:
return status_dict["response"]
return flask.Response(
status=200,
response=json.dumps({"status": status_dict[service_key]}),
)