webhooks-with-cloud-run/Monolith/main.py (119 lines of code) (raw):
# Copyright 2019 Google, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import hmac
import json
import os
import sys
import time
import urllib
from flask import Flask, request
from google.cloud import secretmanager_v1beta1
from hashlib import sha1
app = Flask(__name__)
@app.route("/", methods=["POST"])
def index():
signature = request.headers.get("X-Hub-Signature", None)
body = request.data
# Only process data with a valid signature
assert verify_signature(signature, body), "Unverified Signature"
# Load the event as JSON for easier handling
event = request.get_json(force=True)
# Insert row into bigquery
insert_row_into_bigquery(event)
# Post new issues to Slack
if event["action"] == "opened":
issue_title = event["issue"]["title"]
issue_url = event["issue"]["html_url"]
send_issue_notification_to_slack(issue_title, issue_url)
# Post response to Github
create_issue_comment(event["issue"]["url"])
print("Yay")
sys.stdout.flush()
return ("", 204)
def verify_signature(signature, body):
expected_signature = "sha1="
try:
# Get secret from Cloud Secret Manager
secret = get_secret(
os.environ.get("PROJECT_NAME"), os.environ.get("SECRET_NAME"), "1"
)
# Compute the hashed signature
hashed = hmac.new(secret, body, sha1)
expected_signature += hashed.hexdigest()
except Exception as e:
print(e)
return hmac.compare_digest(signature, expected_signature)
def send_issue_notification_to_slack(issue_title, issue_url):
# Sends a message to Slack Channel
msg = {
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"New issue created: <{issue_url}|{issue_title}>",
},
}
]
}
req = urllib.request.Request(
os.environ.get("SLACK_URL"),
data=json.dumps(msg).encode("utf8"),
headers={"Content-Type": "application/json"},
)
response = urllib.request.urlopen(req)
def insert_row_into_bigquery(event):
from google.cloud import bigquery
# Set up bigquery instance
client = bigquery.Client()
dataset_id = os.environ.get("DATASET")
table_id = os.environ.get("TABLE")
table_ref = client.dataset(dataset_id).table(table_id)
table = client.get_table(table_ref)
# Insert row
row_to_insert = [
(
event["issue"]["title"],
event["action"],
event["issue"]["html_url"],
time.time(),
)
]
bq_errors = client.insert_rows(table, row_to_insert)
# If errors, log to Stackdriver
if bq_errors:
entry = {
"severity": "WARNING",
"msg": "Row not inserted.",
"errors": bq_errors,
"row": row_to_insert,
}
print(json.dumps(entry))
def create_issue_comment(api_url):
# Posts an auto response to Github Issue
# Get tokens
pem = get_secret(os.environ.get("PROJECT_NAME"), os.environ.get("PEM"), "1")
app_token = get_jwt(pem)
installation_token = get_installation_token(app_token)
# Create Github issue comment via HTTP POST
try:
msg = {
"body": "Thank you for filing an issue. \
Someone will respond within 24 hours."
}
req = urllib.request.Request(
api_url + "/comments", data=json.dumps(msg).encode("utf8")
)
req.add_header("Authorization", f"Bearer {installation_token}")
response = urllib.request.urlopen(req)
except Exception as e:
print(e)
def get_jwt(pem):
# Encodes and returns JWT
from jwt import JWT, jwk_from_pem
payload = {
"iat": int(time.time()),
"exp": int(time.time()) + (10 * 60),
"iss": os.environ.get("APP_ID"),
}
jwt = JWT()
return jwt.encode(payload, jwk_from_pem(pem), "RS256")
def get_installation_token(jwt):
# Get App installation token to use Github API
req = urllib.request.Request(os.environ.get("INSTALLATION"), method="POST")
req.add_header("Authorization", f"Bearer {jwt}")
req.add_header("Accept", "application/vnd.github.machine-man-preview+json")
response = urllib.request.urlopen(req)
token_json = json.loads(response.read())
return token_json["token"]
def get_secret(project_name, secret_name, version_num):
# Returns secret payload from Cloud Secret Manager
client = secretmanager_v1beta1.SecretManagerServiceClient()
name = client.secret_version_path(project_name, secret_name, version_num)
secret = client.access_secret_version(name)
return secret.payload.data
if __name__ == "__main__":
PORT = int(os.getenv("PORT")) if os.getenv("PORT") else 8080
# This is used when running locally. Gunicorn is used to run the
# application on Cloud Run. See entrypoint in Dockerfile.
app.run(host="127.0.0.1", port=PORT, debug=True)