in dialogflow-cx/vpc-sc-demo/backend/update_blueprint.py [0:0]
def update_service_directory_webhook_fulfillment(): # pylint: disable=too-many-return-statements,too-many-locals
"""Update agent webhook; toggle between service directory and generic webhook."""
data = su.get_token_and_project(flask.request)
if "response" in data:
return data["response"]
project_id, token = data["project_id"], data["token"]
untrusted_region = flask.request.args["region"]
if untrusted_region in ["us-central1"]:
region = untrusted_region
else:
return flask.Response(
status=200,
response=json.dumps({"status": "BLOCKED", "reason": "UNKNOWN_REGION"}),
)
content = flask.request.get_json(silent=True)
if content["status"] is True:
fulfillment = "service-directory"
else:
fulfillment = "generic-web-service"
bucket = flask.request.args["bucket"]
webhook_name = flask.request.args["webhook_name"]
webhook_trigger_uri = (
f"https://{region}-{project_id}.cloudfunctions.net/{webhook_name}"
)
result = su.get_agents(token, project_id, region)
if "response" in result:
return result["response"]
agent_name = result["data"]["Telecommunications"]["name"]
result = su.get_webhooks(token, agent_name, project_id, region)
if "response" in result:
return result["response"]
webhook_dict = result["data"]["cxPrebuiltAgentsTelecom"]
webhook_name = webhook_dict["name"]
if fulfillment == "generic-web-service":
data = {
"displayName": "cxPrebuiltAgentsTelecom",
"genericWebService": {"uri": webhook_trigger_uri},
}
elif fulfillment == "service-directory":
def encode(msg_bytes):
return base64.b64encode(msg_bytes).decode("ascii")
data = {
"displayName": "cxPrebuiltAgentsTelecom",
"serviceDirectory": {
"service": (
f"projects/{project_id}/locations/{region}/"
f"namespaces/df-namespace"
f"/services/df-service"
),
"genericWebService": {
"uri": f"https://{DOMAIN}",
"allowedCaCerts": [encode(uu.get_cert(token, project_id, bucket))],
},
},
}
else:
return flask.Response( # pragma: no cover
status=500, response=f"Unexpected setting for fulfillment: {fulfillment}"
)
headers = {}
headers["x-goog-user-project"] = project_id
headers["Authorization"] = f"Bearer {token}"
response = requests.patch(
f"https://{region}-dialogflow.googleapis.com/v3/{webhook_name}",
headers=headers,
json=data,
timeout=10,
)
if response.status_code != 200:
logger.info(
" dialogflow API unexpectedly rejected invocation PATCH request: %s",
response.text,
)
return flask.Response(
status=response.status_code, response=json.dumps({"error": response.text})
)
response = flask.Response(status=200)
return au.register_action(
flask.request, response, au.ACTIONS.UPDATE_STATUS, {"service": "ingress"}
)