cloud-run-alwayson-cpu-weather-advisory/python/main.py (106 lines of code) (raw):
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import atexit
from datetime import datetime as dt
import os
from time import mktime, time
from threading import Event, Thread
from flask import Flask, render_template, request
import requests
from google.cloud import datastore
app = Flask(__name__)
DATASTORE = datastore.Client()
DEFAULT = 'CA'
MINUTE = 60
URL = 'https://api.weather.gov/alerts/active?area=%s'
STATES = frozenset((
'AK', 'AL', 'AR', 'AS', 'AZ', 'CA', 'CO', 'CT', 'DC', 'DE', 'FL',
'GA', 'GU', 'HI', 'IA', 'ID', 'IL', 'IN', 'KS', 'KY', 'LA', 'MA',
'MD', 'ME', 'MI', 'MN', 'MO', 'MS', 'MT', 'NC', 'ND', 'NE', 'NH',
'NJ', 'NM', 'NV', 'NY', 'OH', 'OK', 'OR', 'PA', 'PR', 'RI', 'SC',
'SD', 'TN', 'TX', 'UT', 'VA', 'VI', 'VT', 'WA', 'WI', 'WV', 'WY',
))
def getStateFromCache(state):
'get state alerts from cache'
query = DATASTORE.query(kind='State')
query.key_filter(DATASTORE.key('State', state))
results = list(query.fetch())
return results[0] if len(results) else None
def stateIsInCache(state):
'check if state alerts are in cache & "fresh"'
useCache = False
fma = time() - 15*MINUTE # "15 minutes ago"
stateData = getStateFromCache(state)
if stateData:
ts = stateData['lastUpdate']
lastUpdate = ts.timestamp() if hasattr(ts, 'timestamp') \
else mktime(ts.timetuple())
useCache = lastUpdate > fma
print(('** Cache fresh, use in-cache data (%s)' if useCache \
else '** Cache stale/missing, API fetch (%s)') % state)
return useCache
def _etl(advisory, state):
'extract/format relevant weather alert data'
prop = advisory['properties']
return {
'area': prop['areaDesc'],
'headline': prop['parameters']['NWSheadline'][0] \
if 'NWSheadline' in prop['parameters'] else prop['headline'],
'effective': prop['effective'],
'expires': prop['expires'],
'instrutions': prop['instruction'].replace('\n', ' ') \
if prop['instruction'] else '(none)'
}
def fetchState(state):
'fetch state weather alerts from API'
rsp = requests.get(URL % state) # call weather API
if rsp.status_code >= 400:
rsp.raise_for_status()
advisories = rsp.json()['features']
return [_etl(advisory, state) for advisory in advisories]
def cacheState(state, advisories): # cache state info
'cache state weather alerts to Datastore'
entity = datastore.Entity(key=DATASTORE.key('State', state))
entity.update({
'advisories': advisories,
'lastUpdate': dt.utcnow(), # last-fetched timestamp
})
DATASTORE.put(entity)
def processState(state):
'check if state in cache & fresh; fetch & cache if not'
if not stateIsInCache(state):
advisories = fetchState(state)
cacheState(state, advisories)
@app.route('/', methods=['GET', 'POST'])
def root():
'main application handler (GET/POST)'
context = {'meth': request.method, 'state': DEFAULT}
# GET: render empty form
# POST: process user request, display results, render empty form
if request.method == 'POST':
try:
state = request.form['state'].strip()[:2].upper() or DEFAULT
context['state'] = state
processState(state)
stateData = getStateFromCache(state)
if stateData:
context['advs'] = stateData['advisories']
else:
context['error'] = 'ERROR: problem with request for {%s}' % state
except Exception as e:
error = 'ERROR: problem with request for %s: %s' % (state, e)
print(error)
context['error'] = error
return render_template('index.html', **context)
def _setInterval(func, interval):
'mimic JS setInterval() - call return value to stop'
stop = Event()
def loop():
while not stop.wait(interval):
func()
Thread(target=loop).start()
return stop.set
def updateCache():
'check each state and update cache as necessary'
for state in STATES:
processState(state)
if __name__ == '__main__':
# always-on CPU refreshes cache every 5 minutes
# (max 3x per always-on CPU re 15 min shutdown)
atexit.register(_setInterval(updateCache, 5*MINUTE))
app.run('0.0.0.0', int(os.environ.get('PORT', 8080)))