nuvolaris/actionexecutor.py (184 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import os, logging, json
import nuvolaris.couchdb_util as cu
import nuvolaris.kube as kube
import croniter as cn
import requests as req
from datetime import datetime
def check(f, what, res):
if f:
logging.info(f"OK: {what}")
return res and True
else:
logging.warn(f"ERR: {what}")
return False
#
# extract the configured interval between
# two consecutive execution of this process
# scheduled via the nuvolaris cron component
#
def from_cron_to_seconds(base, cronExpr):
"""
>>> import nuvolaris.actionexecutor as ae
>>> from datetime import datetime
>>> base = datetime.now()
>>> ae.from_cron_to_seconds(base,'* * * * *')
60.0
>>> ae.from_cron_to_seconds(base,'*/30 * * * *')
1800.0
"""
itr = cn.croniter(cronExpr, base)
nextTime1 = itr.get_next(datetime)
nextTime2 = itr.get_next(datetime)
diff = nextTime2 - nextTime1
return diff.total_seconds()
#
# Check if an action with the specified cron expression
# should have been triggered since the last execution of this scheduled job.
#
def action_should_trigger(currentDate, executionInterval, actionCronExpression):
"""
>>> import nuvolaris.actionexecutor as ae
>>> from datetime import datetime
>>> base = datetime.now()
>>> base1 = datetime(2022, 8, 6, 16, 30, 0, 0)
>>> base2 = datetime(2022, 8, 6, 16, 00, 0, 0)
>>> base3 = datetime(2022, 8, 6, 16, 3, 0, 0)
>>> base4 = datetime(2022, 8, 6, 16, 10, 0, 0)
>>> ae.action_should_trigger(base,60,'* * * * *')
True
>>> ae.action_should_trigger(base1, 60,'*/30 * * * *')
True
>>> ae.action_should_trigger(base2, 60,'*/30 * * * *')
True
>>> ae.action_should_trigger(base3, 60,'*/5 * * * *')
False
>>> ae.action_should_trigger(base4, 60,'*/5 * * * *')
True
"""
currentTimestamp = datetime.timestamp(currentDate)
prevTimestamp = currentTimestamp - executionInterval
prevDate = datetime.fromtimestamp(prevTimestamp)
result = False
for dt in cn.croniter_range(prevDate, currentDate, actionCronExpression):
if(dt):
result = True
break
return result
#
# query the dbn database using the specified selecto
#
def find_docs(db, dbn, selector, username, password):
documents = []
query = json.loads(selector)
logging.info(f"Querying couchdb {dbn} for documents")
#CouchDB returns no more than 25 records. We iterate to get all the cron enabled actions.
while(True):
logging.info(f"select query param {json.dumps(query)}")
res = db.find_doc(dbn, json.dumps(query), username, password)
if(res == None):
break
if(res['docs']):
docs = list(res['docs'])
if(len(docs) > 0):
documents.extend(docs)
if(res['bookmark']):
query['bookmark']=res['bookmark']
else:
logging.info('docs item is an emtpy list. No more documents found')
break
else:
logging.info('docs items not present. no more documents found')
break
return list(documents)
#
# Get subject from nuvolaris_subjects db
#
#TODO need to find a way to build a dictionary with dynamic key or a hashmap
def get_subjects(db, username, password):
subjects = []
selector = '{"selector":{"subject": {"$exists": true}},"fields":["namespaces"]}'
namespaces = find_docs(db, "subjects", selector, username, password)
for entry in namespaces:
currentNamespaceList = list(entry['namespaces'])
for namespace in currentNamespaceList:
subjects.append(namespace);
return list(subjects)
#
# get actions from the nuvolaris_whisks db
# having an annotation with key=cron or key=autoexec and value=true
#
def get_cron_aware_actions(db, username, password):
selector = '{"selector":{"entityType":"action", "$or":[{"annotations": {"$elemMatch": {"key": "cron"}}},{"annotations": {"$elemMatch": {"$and":[{"key":"autoexec"},{"value":true}]}}}] }, "fields": ["_id", "annotations", "name", "_rev","namespace","parameters","entityType"]}'
return find_docs(db, "whisks", selector, username, password)
#
# POST a request to invoke the ow action
#
def call_ow_action(url, parameters, ow_auth):
logging.info(f"POST request to {url}")
headers = {'Content-Type': 'application/json'}
try:
response = None
if(len(parameters)>0):
logging.info("calling an action with %s parameters",len(parameters))
# OpenWhisk passes automatically the registered parameters, so this cron executor does not need to pass them.
response = req.post(url, auth=(ow_auth['username'],ow_auth['password']))
if (response.status_code in [200,202]):
logging.info(f"call to {url} succeeded with {response.status_code}. Body {response.text}")
return True
logging.warn(f"query to {url} failed with {response.status_code}. Body {response.text}")
return False
except Exception as inst:
logging.warn(f"Failed to invoke action {type(inst)}")
logging.warn(inst)
return False
#
# Update the action annotations to disable the cron execution
# by setting {"autoexec":false}
#
def unschedule_autoexec_action(action_url, ow_auth):
logging.info(f"Purging cron details from action {action_url}")
headers = {'Content-Type': 'application/json'}
try:
updating_data = {}
updated_annotation = [{"key":"autoexec","value":False}]
updating_data["annotations"]=updated_annotation
logging.info(f"updating with {json.dumps(updating_data)}")
response = req.put(f"{action_url}?overwrite=true", auth=(ow_auth['username'],ow_auth['password']), headers=headers, data=json.dumps(updating_data))
if response.status_code != 200:
logging.warn(f"PUT call to {action_url}?overwrite=true failed with {response.status_code}. Body {response.text}")
return False
logging.info(f"PUT call to {action_url}?overwrite=true succeeded with {response.status_code}. Action cron policy removed")
return True
except Exception as inst:
logging.warn(f"Failed to invoke action {type(inst)}")
logging.warn(inst)
return False
#
# Extract the cron expression from the given annotations list
#
def get_cron_expression(actionAnnotations):
"""
>>> import nuvolaris.actionexecutor as ae
>>> annotations = []
>>> annotations.append({"key":"cron","value":"*/2 * * * *"})
>>> annotations.append({"key":"provide-api-key","value":False})
>>> annotations.append({"key":"exec","value":"nodejs:14"})
>>> "*/2 * * * *" == ae.get_cron_expression(annotations)
True
>>> "once" == ae.get_cron_expression([{"key":"cron","value":"once"}])
True
"""
for a in actionAnnotations:
if(a['key'] == 'cron'):
return a['value']
return None
#
# Extract the autoexect flag from the given annotations list
#
def get_autoexec(actionAnnotations):
"""
>>> import nuvolaris.actionexecutor as ae
>>> annotations = []
>>> annotations.append({"key":"autoexec","value":True})
>>> annotations.append({"key":"provide-api-key","value":False})
>>> annotations.append({"key":"exec","value":"nodejs:14"})
>>> ae.get_autoexec(annotations)
True
>>> ae.get_autoexec([{"key":"autoexec","value":False}])
False
"""
for a in actionAnnotations:
if(a['key'] == 'autoexec'):
return a['value']
return False
#
# Determine if the action should be triggered
# possible return values are
# execute if the action should be triggered according to current date
# no_execution if the action should not be triggered or the cron expression is not a valid one
#
def should_trigger(actionNamespace, actionName, actionCronExpression, currentDate, executionInterval):
if not cn.croniter.is_valid(actionCronExpression):
logging.warn(f"action {actionNamespace}/{actionName} cron expression {actionCronExpression} is not valid. Skipping execution")
return "no_execution"
if not action_should_trigger(currentDate, executionInterval, actionCronExpression):
logging.warn(f"action {actionNamespace}/{actionName} cron expression {actionCronExpression} does not trigger an execution at {currentDate}")
return "no_execution"
return "execute"
def build_action_url(baseurl,namespace, package, action_name):
"""
>>> import nuvolaris.actionexecutor as ae
>>> url = ae.build_action_url("http://localhost:3233/api/v1/namespaces/","nuvolaris","mongo","mongo")
>>> url == "http://localhost:3233/api/v1/namespaces/nuvolaris/actions/mongo/mongo"
True
>>> url = ae.build_action_url("http://localhost:3233/api/v1/namespaces/","nuvolaris",None,"mongo")
>>> url == "http://localhost:3233/api/v1/namespaces/nuvolaris/actions/mongo"
True
>>> url = ae.build_action_url("http://localhost:3233/api/v1/namespaces/","nuvolaris","","mongo")
>>> url == "http://localhost:3233/api/v1/namespaces/nuvolaris/actions/mongo"
True
"""
url = f"{baseurl}{namespace}/actions/"
if package:
url += f"{package}/"
url += action_name
return url
def get_package_from_namespace(action_namespace):
"""
>>> import nuvolaris.actionexecutor as ae
>>> package = ae.get_package_from_namespace("nuvolaris")
>>> package == ""
True
>>> package = ae.get_package_from_namespace("nuvolaris/mongo")
>>> package == "mongo"
True
>>> package = ae.get_package_from_namespace("nuvolaris/mongo/mongo")
>>> package == "mongo/mongo"
True
"""
parts = action_namespace.partition("/")
if parts[2]:
return parts[2]
return ""
def get_subject(action_namespace):
"""
>>> import nuvolaris.actionexecutor as ae
>>> package = ae.get_subject("nuvolaris")
>>> package == "nuvolaris"
True
>>> package = ae.get_subject("nuvolaris/mongo")
>>> package == "nuvolaris"
True
>>> package = ae.get_subject("nuvolaris/mongo/mongo")
>>> package == "nuvolaris"
True
"""
return action_namespace.partition("/")[0]
#
# Evaluate if the given whisk action must be executed or not
#
# dAction input is a json Object with similar structure.
#
# dAction = '{"_id":"nuvolaris/hello-cron-action","annotations":[{"key":"cron","value":"*/2 * * * *"},{"key":"provide-api-key","value":false},{"key":"exec","value":"nodejs:14"}],"name":"hello-cron-action","_rev":"1-19f424e1fec1c02a2ecccf6f90978e31","namespace":"nuvolaris","parameters":[],"entityType":"action"}'
def handle_action(baseurl, currentDate, executionInterval, dAction, subjects):
actionName = dAction['name']
entityType = dAction['entityType']
actionNamespace = dAction['namespace']
actionParameters = list(dAction['parameters'])
actionAnnotations = list(dAction['annotations'])
autoexecAction = get_autoexec(actionAnnotations)
execute = "no_execution"
#gives always precedence to the autoexec annotations
if autoexecAction:
execute = "execute_once"
else:
actionCronExpression = get_cron_expression(actionAnnotations)
execute = should_trigger(actionNamespace,actionName,actionCronExpression,currentDate,executionInterval)
if "no_execution" == execute:
return None
namespaceSubject = get_subject(actionNamespace)
auth = get_auth(subjects, namespaceSubject)
if(auth):
package = get_package_from_namespace(actionNamespace)
base_action_url = build_action_url(baseurl,namespaceSubject, package, actionName)
ret = call_ow_action(f"{base_action_url}?blocking=false&result=false", actionParameters, auth)
if ret and "execute_once" == execute:
unschedule_autoexec_action(base_action_url, auth)
else:
logging.warn('No subject {subjectName} credentials found!')
return None
#
# Search and return a {'username':'xxx','passowrd':'xxx'} dictionary
#
def get_auth(subjects, subjectName):
for subject in subjects:
if(subject['name'] == subjectName):
return {'username':subject['uuid'], 'password':subject['key']}
return None
#
# Will queries the internal CouchDB for cron aware actions
# to be triggered since the last execution time.
# TODO the interval execution time must be parametrized.
# Implement the logic to query for actions and evaluate how to execute them
#
def start():
# load scheduler config from the os environment
cfg = os.environ.get("SCHEDULER_CONFIG")
if cfg:
logging.basicConfig(level=logging.INFO)
config = json.loads(cfg)
currentDate = datetime.now()
interval = from_cron_to_seconds(currentDate, config['scheduler.schedule'])
logging.info(f"interval in seconds between 2 execution is {interval} seconds")
db = cu.CouchDB()
res = check(db.wait_db_ready(30), "wait_db_ready", True)
if(res):
ow_protocol = config['controller.protocol']
ow_host = config['controller.host']
ow_port = config['controller.port']
baseurl = f"{ow_protocol}://{ow_host}:{ow_port}/api/v1/namespaces/"
actions = get_cron_aware_actions(db, config['couchdb.controller.user'],config['couchdb.controller.password'])
if(len(actions) > 0):
subjects = get_subjects(db, config['couchdb.controller.user'],config['couchdb.controller.password'])
for action in actions:
handle_action(baseurl, currentDate, interval, action, subjects)
else:
logging.info('No cron aware action extracted. Exiting....')
else:
logging.warn("CouchDB it is not available. Exiting....")