webhooks-with-cloud-run/MicroServices/Webhook/webhook.py (52 lines of code) (raw):
# Copyright 2019 Google, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import hmac
import json
import os
import sys
import time
import urllib
from flask import Flask, request
from google.cloud import secretmanager_v1beta1
from hashlib import sha1
app = Flask(__name__)
@app.route("/", methods=["POST"])
def index():
signature = request.headers.get("X-Hub-Signature", None)
body = request.data
# Only process data with a valid signature
assert verify_signature(signature, body), "Unverified Signature"
publish_to_pubsub(body)
sys.stdout.flush()
return ("", 204)
def verify_signature(signature, body):
expected_signature = "sha1="
try:
# Get secret from Cloud Secret Manager
secret = get_secret(
os.environ.get("PROJECT_NAME"), os.environ.get("SECRET_NAME"), "1"
)
# Compute the hashed signature
hashed = hmac.new(secret, body, sha1)
expected_signature += hashed.hexdigest()
except Exception as e:
print(e)
return hmac.compare_digest(signature, expected_signature)
def publish_to_pubsub(msg):
"""
Publishes the message to Cloud Pub/Sub
"""
try:
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(
os.environ.get("PROJECT_NAME"), os.environ.get("TOPIC_NAME")
)
# Pub/Sub data must be bytestring, attributes must be strings
future = publisher.publish(topic_path, data=msg)
exception = future.exception()
if exception:
raise Exception(exception)
print(f"Published message: {future.result()}")
except Exception as e:
# Log any exceptions to stackdriver
entry = dict(severity="WARNING", message=e)
print(entry)
def get_secret(project_name, secret_name, version_num):
# Returns secret payload from Cloud Secret Manager
client = secretmanager_v1beta1.SecretManagerServiceClient()
name = client.secret_version_path(project_name, secret_name, version_num)
secret = client.access_secret_version(name)
return secret.payload.data
if __name__ == "__main__":
PORT = int(os.getenv("PORT")) if os.getenv("PORT") else 8080
# This is used when running locally. Gunicorn is used to run the
# application on Cloud Run. See entrypoint in Dockerfile.
app.run(host="127.0.0.1", port=PORT, debug=True)