def webhook_access_allow_unauthenticated_status()

in dialogflow-cx/vpc-sc-demo/backend/status_blueprint.py [0:0]


def webhook_access_allow_unauthenticated_status():  # pylint: disable=too-many-branches,too-many-return-statements
    """Get boolean status of allow unauthenticated webhook access."""
    data = su.get_token_and_project(flask.request)
    if "response" in data:
        return data["response"]
    project_id, token = data["project_id"], data["token"]
    region = flask.request.args["region"]
    webhook_name = flask.request.args["webhook_name"]

    response = su.check_function_exists(token, project_id, region, webhook_name)
    if "response" in response:
        return response["response"]

    headers = {}
    headers["x-goog-user-project"] = project_id
    headers["Authorization"] = f"Bearer {token}"
    result = requests.get(
        (
            "https://cloudfunctions.googleapis.com/v2/"
            f"projects/{project_id}/locations/{region}/"
            f"functions/{webhook_name}:getIamPolicy"
        ),
        headers=headers,
        timeout=10,
    )
    if result.status_code == 403:
        if (result.json()["error"]["status"] == "PERMISSION_DENIED") and (
            result.json()["error"]["message"].startswith(
                "Permission 'cloudfunctions.functions.getIamPolicy' denied"
            )
        ):
            return flask.Response(
                status=200,
                response=json.dumps(
                    {"status": "BLOCKED", "reason": "PERMISSION_DENIED"}
                ),
            )
        if (result.json()["error"]["status"] == "PERMISSION_DENIED") and (
            result.json()["error"]["message"].startswith(
                "Cloud Functions API has not been used in project"
            )
        ):
            return flask.Response(
                status=200,
                response=json.dumps(
                    {"status": "BLOCKED", "reason": "CLOUDFUNCTIONS_API_DISABLED"}
                ),
            )
        for details in result.json()["error"]["details"]:
            for violation in details["violations"]:
                if violation["type"] == "VPC_SERVICE_CONTROLS":
                    return flask.Response(
                        status=200,
                        response=json.dumps(
                            {"status": "BLOCKED", "reason": "VPC_SERVICE_CONTROLS"}
                        ),
                    )
        return flask.Response(status=500, response=result.text)
    if result.status_code != 200:
        logger.info("  cloudfunctions API rejected request: %s", result.text)
        return flask.abort(result.status_code)
    policy_dict = result.json()
    all_users_is_invoker_member = False
    for binding in policy_dict.get("bindings", []):
        for member in binding.get("members", []):
            if (
                member == "allUsers"
                and binding["role"] == "roles/cloudfunctions.invoker"
            ):
                all_users_is_invoker_member = True

    logger.info("  all_users_is_invoker_member: %s", all_users_is_invoker_member)
    if all_users_is_invoker_member:
        return flask.Response(status=200, response=json.dumps({"status": False}))
    return flask.Response(status=200, response=json.dumps({"status": True}))