dialogflow-cx/vpc-sc-demo/components/reverse_proxy_server/proxy-server-src/app.py (69 lines of code) (raw):

# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # pylint: disable=E1101 """Reverse proxy server to redirect incoming requests to a webhook trigger uri.""" import logging import os import signal import google.auth import requests from flask import Flask, Response, abort, request from google.auth.transport import requests as reqs from google.oauth2 import id_token app = Flask(__name__) gunicorn_logger = logging.getLogger("gunicorn.error") app.logger.handlers = gunicorn_logger.handlers app.logger.setLevel(gunicorn_logger.level) authorized_emails = [os.environ["BOT_USER"]] @app.before_request def check_user_authentication(): # pylint: disable=R1710 """Validates that caller is in the allowslist authorized_emails.""" # pylint: disable=logging-fstring-interpolation app.logger.info("[0] Begin check_user_authentication") verified_email = None auth = request.headers.get("Authorization", None) if auth is None: return abort(403) if not auth.startswith("Bearer "): return abort(403) token = auth[7:] # Remove "Bearer: " prefix # Extract the email address from the token. Since there may be # two types of token provided (Firebase or Google OAuth2) and # failed verification raises an exception, need multiple # try/except blocks. info = None try: info = id_token.verify_firebase_token(token, reqs.Request()) except ValueError: pass try: if info is None: info = id_token.verify_oauth2_token(token, reqs.Request()) except ValueError: pass if info is None: return abort(403) if "email" not in info: return abort(403) verified_email = info["email"] app.logger.info(f"[0] User: {verified_email}") if verified_email not in authorized_emails: return abort(403) @app.post("/") def root() -> Response: """Redirect request to webhook trigger.""" app.logger.info('Endpoint "webhook" triggered') audience = os.environ["WEBHOOK_TRIGGER_URI"] app.logger.info("WEBHOOK_TRIGGER_URI: %s", audience) auth_req = google.auth.transport.requests.Request() token = id_token.fetch_id_token(auth_req, audience) new_headers = {} new_headers["Content-type"] = "application/json" new_headers["Authorization"] = f"Bearer {token}" result = requests.post( audience, json=request.get_json(), headers=new_headers, timeout=10 ) if result.status_code != 200: app.logger.info("Webhook Response error code: %s", result.status_code) app.logger.info("Webhook Response error %s", result.text) else: app.logger.info("Webhook Response: %s", result.status_code) return Response(status=result.status_code, response=result.text) def shutdown_handler(signal_int, frame) -> None: """Safely exit program""" # pylint: disable=logging-fstring-interpolation del frame app.logger.info(f"Caught Signal {signal.strsignal(signal_int)}") raise SystemExit(0) if __name__ == "__main__": # pragma: no cover # Running application locally, outside of a Google Cloud Environment # handles Ctrl-C termination signal.signal(signal.SIGINT, shutdown_handler) app.run(host="localhost", port=8080, debug=False) else: # handles Cloud Run container termination signal.signal(signal.SIGTERM, shutdown_handler)