in dialogflow-cx/vpc-sc-demo/backend/status_utilities.py [0:0]
def get_access_policy_name(token, access_policy_title, project_id, error_code=200):
"""Get access policy name using cloudresourcemanager API."""
if not access_policy_title:
return {
"response": flask.Response(
status=error_code,
response=json.dumps(
{"status": "BLOCKED", "reason": "NO_ACCESS_POLICY"}
),
)
}
headers = {}
headers["Content-type"] = "application/json"
headers["Authorization"] = f"Bearer {token}"
response = requests.post(
f"https://cloudresourcemanager.googleapis.com/v1/projects/{project_id}:getAncestry",
headers=headers,
timeout=10,
)
if response.status_code != 200:
return {
"response": flask.Response(
status=error_code,
response=json.dumps({"status": "BLOCKED", "reason": "UNKNOWN_STATUS"}),
)
}
organization_id = None
for ancestor_dict in response.json().get("ancestor", []):
if ancestor_dict["resourceId"]["type"] == "organization":
organization_id = ancestor_dict["resourceId"]["id"]
if not organization_id:
return {
"response": flask.Response(
status=error_code,
response=json.dumps({"status": "BLOCKED", "reason": "NO_ORGANIZATION"}),
)
}
response = get_project_number(token, project_id)
if "response" in response:
return response
project_number = response["project_number"]
headers = {}
headers["Content-type"] = "application/json"
headers["Authorization"] = f"Bearer {token}"
response = requests.get(
(
"https://accesscontextmanager.googleapis.com/v1/"
f"accessPolicies?parent=organizations/{organization_id}"
),
headers=headers,
timeout=10,
)
for policy in response.json().get("accessPolicies", []):
if policy["title"] == access_policy_title:
if f"projects/{project_number}" in policy["scopes"]:
return {"access_policy_name": policy["name"]}
return {
"response": flask.Response(
status=error_code,
response=json.dumps({"status": "BLOCKED", "reason": "POLICY_NOT_FOUND"}),
)
}