def get_agents()

in dialogflow-cx/vpc-sc-demo/backend/status_utilities.py [0:0]


def get_agents(token, project_id, region):  # pylint: disable=too-many-branches
    """Get agents using dialogflow API"""
    headers = {}
    headers["x-goog-user-project"] = project_id
    headers["Authorization"] = f"Bearer {token}"
    if region not in ["us-central1"]:
        return {
            "response": flask.Response(
                status=200,
                response=json.dumps({"status": "BLOCKED", "reason": "UNKNOWN_REGION"}),
            )
        }
    result = requests.get(
        (
            f"https://{region}-dialogflow.googleapis.com/v3/"
            f"projects/{project_id}/locations/{region}/agents"
        ),
        headers=headers,
        timeout=10,
    )
    if result.status_code == 403:
        if (result.json()["error"]["status"] == "PERMISSION_DENIED") and (
            result.json()["error"]["message"].startswith(
                "Dialogflow API has not been used in project"
            )
        ):
            response = {
                "response": flask.Response(
                    status=200,
                    response=json.dumps(
                        {"status": "BLOCKED", "reason": "DIALOGFLOW_API_DISABLED"}
                    ),
                )
            }
        elif (result.json()["error"]["status"] == "PERMISSION_DENIED") and (
            result.json()["error"]["message"].startswith(
                "Caller does not have required permission"
            )
        ):
            response = {
                "response": flask.Response(
                    status=200,
                    response=json.dumps(
                        {"status": "BLOCKED", "reason": "WRONG_PERMISSION"}
                    ),
                )
            }
        elif "details" in result.json()["error"]:
            response = None
            for details in result.json()["error"]["details"]:
                for violation in details["violations"]:
                    if violation["type"] == "VPC_SERVICE_CONTROLS":
                        response = {
                            "response": flask.Response(
                                status=200,
                                response=json.dumps(
                                    {
                                        "status": "BLOCKED",
                                        "reason": "VPC_SERVICE_CONTROLS",
                                    }
                                ),
                            )
                        }
            if response is None:
                response = {
                    "response": flask.Response(
                        status=200,
                        response=json.dumps(
                            {"status": "BLOCKED", "reason": "PERMISSION_DENIED"}
                        ),
                    )
                }
    elif result.status_code != 200:
        logger.info("  dialogflow API rejected request: %s", result.text)
        response = {
            "response": flask.Response(
                status=result.status_code, response=json.dumps({"error": result.text})
            )
        }
    else:
        result_dict = result.json()
        if len(result_dict) == 0:
            response = {
                "response": flask.Response(
                    status=200,
                    response=json.dumps(
                        {"status": "BLOCKED", "reason": "AGENT_NOT_FOUND"}
                    ),
                )
            }
        elif "error" in result_dict:
            logger.info("  get_agents error: %s", result.text)
            # Seems like a potential bug; returning a dict? Also error resulting from 200 code?
            response = None
        else:
            response = {
                "data": {data["displayName"]: data for data in result_dict["agents"]}
            }
    return response