def get_webhooks()

in dialogflow-cx/vpc-sc-demo/backend/status_utilities.py [0:0]


def get_webhooks(token, agent_name, project_id, region):
    """Get webhooks using dialogflow API."""
    headers = {}
    headers["x-goog-user-project"] = project_id
    headers["Authorization"] = f"Bearer {token}"
    result = requests.get(
        f"https://{region}-dialogflow.googleapis.com/v3/{agent_name}/webhooks",
        headers=headers,
        timeout=10,
    )
    if result.status_code == 403:
        for details in result.json()["error"]["details"]:
            for violation in details["violations"]:
                if violation["type"] == "VPC_SERVICE_CONTROLS":
                    response = flask.Response(
                        status=200,
                        response=json.dumps(
                            {"status": "BLOCKED", "reason": "VPC_SERVICE_CONTROLS"}
                        ),
                    )
                    return {"response": response}
    if result.status_code != 200:
        logger.info("  dialogflow API rejected request: %s", result.text)
        response = flask.Response(
            status=result.status_code, response=json.dumps({"error": result.text})
        )
        return {"response": response}
    agents = result.json()
    if "webhooks" not in agents:
        return {
            "response": flask.Response(
                status=200,
                response=json.dumps(
                    {"status": "BLOCKED", "reason": "WEBHOOK_NOT_FOUND"}
                ),
            )
        }
    return {"data": {data["displayName"]: data for data in agents["webhooks"]}}