in dialogflow-cx/vpc-sc-demo/backend/status_utilities.py [0:0]
def check_function_exists(token, project_id, region, function_name):
"""Check if function exists using cloudfunctions api."""
headers = {}
headers["x-goog-user-project"] = project_id
headers["Authorization"] = f"Bearer {token}"
result = requests.get(
(
f"https://cloudfunctions.googleapis.com/v1/"
f"projects/{project_id}/locations/{region}/functions/{function_name}"
),
headers=headers,
timeout=10,
)
if result.status_code == 200:
response = {"status": "OK"}
elif result.status_code == 404 and result.json()["error"]["status"] == "NOT_FOUND":
response = {
"response": flask.Response(
status=200,
response=json.dumps(
{"status": "BLOCKED", "reason": "WEBHOOK_NOT_FOUND"}
),
)
}
elif result.status_code == 403 and result.json()["error"]["message"].startswith(
"Cloud Functions API has not been used in project"
):
response = {
"response": flask.Response(
status=200,
response=json.dumps(
{"status": "BLOCKED", "reason": "CLOUDFUNCTIONS_API_DISABLED"}
),
)
}
elif result.status_code == 403:
if (result.json()["error"]["status"] == "PERMISSION_DENIED") and (
result.json()["error"]["message"].startswith(
"Permission 'cloudfunctions.functions.get' denied on resource"
)
):
response = {
"response": flask.Response(
status=200,
response=json.dumps(
{"status": "BLOCKED", "reason": "PERMISSION_DENIED"}
),
)
}
else:
response = None
for details in result.json()["error"]["details"]:
for violation in details["violations"]:
if violation["type"] == "VPC_SERVICE_CONTROLS":
response = {
"response": flask.Response(
status=200,
response=json.dumps(
{
"status": "BLOCKED",
"reason": "VPC_SERVICE_CONTROLS",
}
),
)
}
if response is None:
response = {
"response": flask.Response(status=500, response=result.text)
}
else:
response = {
"response": flask.Response(status=500, response=json.dumps(result.json()))
}
return response