webhooks-with-cloud-run/MicroServices/BigQuery/bigquery.py (51 lines of code) (raw):
# Copyright 2019 Google, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import hmac
import json
import os
import sys
import time
import urllib
from hashlib import sha1
from flask import Flask, request
from google.cloud import secretmanager_v1beta1
from google.cloud import bigquery
app = Flask(__name__)
@app.route("/", methods=["POST"])
def index():
envelope = request.get_json()
# Check if valid JSON
if not envelope:
raise Exception("Expecting JSON payload")
# Check if valid pub/sub message
if "message" not in envelope:
raise Exception("Not a valid Pub/Sub Message")
msg = envelope["message"]
data = json.loads(base64.b64decode(msg["data"]).decode("utf-8").strip())
# Insert row into bigquery
insert_row_into_bigquery(data)
print("Yay")
sys.stdout.flush()
return ("", 204)
def insert_row_into_bigquery(data):
# Set up bigquery instance
client = bigquery.Client()
dataset_id = os.environ.get("DATASET")
table_id = os.environ.get("TABLE")
table_ref = client.dataset(dataset_id).table(table_id)
table = client.get_table(table_ref)
# Insert row
row_to_insert = [
(
data["issue"]["title"],
data["action"],
data["issue"]["html_url"],
time.time(),
)
]
bq_errors = client.insert_rows(table, row_to_insert)
# If errors, log to Stackdriver
if bq_errors:
entry = {
"severity": "WARNING",
"msg": "Row not inserted.",
"errors": bq_errors,
"row": row_to_insert,
}
print(json.dumps(entry))
if __name__ == "__main__":
PORT = int(os.getenv("PORT")) if os.getenv("PORT") else 8080
# This is used when running locally. Gunicorn is used to run the
# application on Cloud Run. See entrypoint in Dockerfile.
app.run(host="127.0.0.1", port=PORT, debug=True)