def check_function_exists()

in dialogflow-cx/vpc-sc-demo/backend/status_utilities.py [0:0]


def check_function_exists(token, project_id, region, function_name):
    """Check if function exists using cloudfunctions api."""
    headers = {}
    headers["x-goog-user-project"] = project_id
    headers["Authorization"] = f"Bearer {token}"
    result = requests.get(
        (
            f"https://cloudfunctions.googleapis.com/v1/"
            f"projects/{project_id}/locations/{region}/functions/{function_name}"
        ),
        headers=headers,
        timeout=10,
    )
    if result.status_code == 200:
        response = {"status": "OK"}
    elif result.status_code == 404 and result.json()["error"]["status"] == "NOT_FOUND":
        response = {
            "response": flask.Response(
                status=200,
                response=json.dumps(
                    {"status": "BLOCKED", "reason": "WEBHOOK_NOT_FOUND"}
                ),
            )
        }
    elif result.status_code == 403 and result.json()["error"]["message"].startswith(
        "Cloud Functions API has not been used in project"
    ):
        response = {
            "response": flask.Response(
                status=200,
                response=json.dumps(
                    {"status": "BLOCKED", "reason": "CLOUDFUNCTIONS_API_DISABLED"}
                ),
            )
        }
    elif result.status_code == 403:
        if (result.json()["error"]["status"] == "PERMISSION_DENIED") and (
            result.json()["error"]["message"].startswith(
                "Permission 'cloudfunctions.functions.get' denied on resource"
            )
        ):
            response = {
                "response": flask.Response(
                    status=200,
                    response=json.dumps(
                        {"status": "BLOCKED", "reason": "PERMISSION_DENIED"}
                    ),
                )
            }
        else:
            response = None
            for details in result.json()["error"]["details"]:
                for violation in details["violations"]:
                    if violation["type"] == "VPC_SERVICE_CONTROLS":
                        response = {
                            "response": flask.Response(
                                status=200,
                                response=json.dumps(
                                    {
                                        "status": "BLOCKED",
                                        "reason": "VPC_SERVICE_CONTROLS",
                                    }
                                ),
                            )
                        }
            if response is None:
                response = {
                    "response": flask.Response(status=500, response=result.text)
                }
    else:
        response = {
            "response": flask.Response(status=500, response=json.dumps(result.json()))
        }
    return response