in dialogflow-cx/vpc-sc-demo/backend/status_utilities.py [0:0]
def get_webhooks(token, agent_name, project_id, region):
"""Get webhooks using dialogflow API."""
headers = {}
headers["x-goog-user-project"] = project_id
headers["Authorization"] = f"Bearer {token}"
result = requests.get(
f"https://{region}-dialogflow.googleapis.com/v3/{agent_name}/webhooks",
headers=headers,
timeout=10,
)
if result.status_code == 403:
for details in result.json()["error"]["details"]:
for violation in details["violations"]:
if violation["type"] == "VPC_SERVICE_CONTROLS":
response = flask.Response(
status=200,
response=json.dumps(
{"status": "BLOCKED", "reason": "VPC_SERVICE_CONTROLS"}
),
)
return {"response": response}
if result.status_code != 200:
logger.info(" dialogflow API rejected request: %s", result.text)
response = flask.Response(
status=result.status_code, response=json.dumps({"error": result.text})
)
return {"response": response}
agents = result.json()
if "webhooks" not in agents:
return {
"response": flask.Response(
status=200,
response=json.dumps(
{"status": "BLOCKED", "reason": "WEBHOOK_NOT_FOUND"}
),
)
}
return {"data": {data["displayName"]: data for data in agents["webhooks"]}}