in dialogflow-cx/vpc-sc-demo/backend/update_blueprint.py [0:0]
def update_webhook_ingress():
"""Update webhook ingress to allow/disallow external connections."""
data = su.get_token_and_project(flask.request)
if "response" in data:
return data["response"]
project_id, token = data["project_id"], data["token"]
region = flask.request.args["region"]
webhook_name = flask.request.args["webhook_name"]
content = flask.request.get_json(silent=True)
headers = {}
headers["Content-type"] = "application/json"
headers["x-goog-user-project"] = project_id
headers["Authorization"] = f"Bearer {token}"
response = requests.get(
(
f"https://cloudfunctions.googleapis.com/v1/projects/{project_id}"
f"/locations/{region}/functions/{webhook_name}"
),
headers=headers,
timeout=10,
)
if response.status_code != 200:
logger.info(" cloudfunctions API rejected GET request: %s", response.text)
return flask.Response(
status=response.status_code, response=json.dumps({"error": response.text})
)
webhook_data = response.json()
ingress_settings = "ALLOW_INTERNAL_ONLY" if content["status"] else "ALLOW_ALL"
if webhook_data["ingressSettings"] == ingress_settings:
return flask.Response(status=200)
webhook_data["ingressSettings"] = ingress_settings
response = requests.patch(
(
f"https://cloudfunctions.googleapis.com/v1/projects/{project_id}"
f"/locations/{region}/functions/{webhook_name}"
),
headers=headers,
json=webhook_data,
timeout=10,
)
if response.status_code != 200:
logger.info(" cloudfunctions API rejected PATCH request: %s", response.text)
return flask.Response(
status=response.status_code, response=json.dumps({"error": response.text})
)
response = flask.Response(status=200)
return au.register_action(
flask.request, response, au.ACTIONS.UPDATE_STATUS, {"service": "ingress"}
)