def update_webhook_access()

in dialogflow-cx/vpc-sc-demo/backend/update_blueprint.py [0:0]


def update_webhook_access():  # pylint: disable=too-many-branches
    """Update webhook access to allow/disallow allUsers."""
    data = su.get_token_and_project(flask.request)
    if "response" in data:
        return data["response"]
    project_id, token = data["project_id"], data["token"]
    region = flask.request.args["region"]
    webhook_name = flask.request.args["webhook_name"]
    content = flask.request.get_json(silent=True)
    internal_only = content["status"]

    headers = {}
    headers["x-goog-user-project"] = project_id
    headers["Authorization"] = f"Bearer {token}"
    response = requests.get(
        (
            f"https://cloudfunctions.googleapis.com/v2/projects/{project_id}"
            f"/locations/{region}/functions/{webhook_name}:getIamPolicy"
        ),
        headers=headers,
        timeout=10,
    )
    if response.status_code != 200:
        logger.info(
            "  cloudfunctions API rejected getIamPolicy GET request: %s", response.text
        )
        return flask.Response(
            status=response.status_code, response=json.dumps({"error": response.text})
        )

    policy_dict = response.json()
    all_users_is_invoker_member = False
    for binding in policy_dict.get("bindings", []):
        for member in binding["members"]:
            if (
                member == "allUsers"
                and binding["role"] == "roles/cloudfunctions.invoker"
            ):
                all_users_is_invoker_member = True
    if (not internal_only and all_users_is_invoker_member) or (
        (internal_only) and (not all_users_is_invoker_member)
    ):
        # internal_only matches request; no change needed
        return flask.Response(status=200)

    if internal_only:
        for binding in policy_dict.get("bindings", []):
            for member in binding["members"]:
                if binding["role"] == "roles/cloudfunctions.invoker":
                    binding["members"] = [
                        member for member in binding["members"] if member != "allUsers"
                    ]
    else:
        if "bindings" not in policy_dict or len(policy_dict["bindings"]) == 0:
            policy_dict["bindings"] = [
                {"role": "roles/cloudfunctions.invoker", "members": []}
            ]
        invoker_role_exists = None
        for binding in policy_dict["bindings"]:
            if binding["role"] == "roles/cloudfunctions.invoker":
                invoker_role_exists = True
                binding["members"].append("allUsers")
        if not invoker_role_exists:
            policy_dict["bindings"].append(
                {"role": "roles/cloudfunctions.invoker", "members": ["allUsers"]}
            )
    response = requests.post(
        (
            f"https://cloudfunctions.googleapis.com/v1/projects/{project_id}"
            f"/locations/{region}/functions/{webhook_name}:setIamPolicy"
        ),
        headers=headers,
        json={"policy": policy_dict},
        timeout=10,
    )
    if response.status_code != 200:
        logger.info(
            "  cloudfunctions API rejected setIamPolicy POST request: %s", response.text
        )
        return flask.Response(
            status=response.status_code, response=json.dumps({"error": response.text})
        )
    response = flask.Response(status=200)
    return au.register_action(
        flask.request, response, au.ACTIONS.UPDATE_STATUS, {"service": "ingress"}
    )