in src/conductor/conductor.py [0:0]
def conductor(composition): # main.
wsk = None
isObject = lambda x: isinstance(x, dict)
# compile AST to FSM
compiler = {}
astnode = lambda f: compiler.setdefault(f.__name__[1:], f)
@astnode
def _sequence(parent, node):
fsm = [{ 'parent': parent, 'type': 'pass' }]
fsm.extend(compile(parent, *node['components']))
return fsm
@astnode
def _action(parent, node):
return [{ 'parent': parent, 'type': 'action', 'name': node['name'] }]
@astnode
def _asynchronous(parent, node):
body = compile(parent, *node['components'])
return [{ 'parent': parent, 'type': 'async', 'return': len(body) + 2}, *body, {'parent': parent, 'type': 'stop' }, {'parent': parent, 'type': 'pass' }]
@astnode
def _function(parent, node):
return [{ 'parent': parent, 'type': 'function', 'exec': node['function']['exec'] }]
@astnode
def _ensure(parent, node):
body = compile(parent, node['body'])
finalizer = compile(parent, node['finalizer'])
fsm = [{ 'parent': parent, 'type': 'try'}, *body, { 'parent': parent, 'type': 'exit' }, *finalizer]
fsm[0]['catch'] = len(fsm) - len(finalizer)
return fsm
@astnode
def _let(parent, node):
body = compile(parent, *node['components'])
return [{'parent': parent, 'type': 'let', 'let': node['declarations']}, *body, { 'parent': parent, 'type': 'exit' }]
@astnode
def _mask(parent, node):
body = compile(parent, *node['components'])
return [{'parent': parent, 'type': 'let', 'let': None}, *body, { 'parent': parent, 'type': 'exit' }]
@astnode
def _do(parent, node):
handler = [ *compile(parent, node['handler']), {'parent': parent, 'type': 'pass'}]
body = compile(parent, node['body'])
fsm = [{ 'parent': parent, 'type': 'try' }, *body, { 'parent': parent, 'type': 'exit' }, *handler]
fsm[0]['catch'] = len(fsm) - len(handler)
fsm[len(fsm) - len(handler) - 1]['next'] = len(handler)
return fsm
@astnode
def _when_nosave(parent, node):
consequent = compile(parent, node['consequent'])
alternate = [ *compile(parent, node['alternate']), { parent: 'parent', 'type': 'pass' }]
fsm = [{ 'parent': parent, 'type': 'pass' },
*compile(parent, node['test']),
{ 'parent': parent, 'type': 'choice', 'then': 1, 'else': len(consequent) + 1 },
*consequent,
*alternate]
fsm[len(fsm) - len(alternate) - 1]['next'] = len(alternate)
return fsm
@astnode
def _loop_nosave(parent, node):
body = compile(parent, node['body'])
test = compile(parent, node['test'])
fsm = [{ 'parent': parent, 'type': 'pass' }, *test,
{ 'parent': parent, 'type': 'choice', 'then': 1, 'else': len(body) + 1 },
*body, { parent: 'parent', 'type': 'pass' }]
fsm[len(fsm) - 2]['next'] = 2 - len(fsm)
return fsm
@astnode
def _doloop_nosave(parent, node):
body = compile(parent, node['body'])
test = compile(parent, node['test'])
fsm = [{ 'parent': parent, 'type': 'pass' }, *body, *test,
{ 'parent': parent, 'type': 'choice', 'else': 1}, { parent: 'parent', 'type': 'pass' }]
fsm[len(fsm) - 2]['then'] = 2 - len(fsm)
return fsm
def compile(parent, *node):
nonlocal compiler
if len(node) == 0:
return [{'parent': parent, 'type': 'empty'}]
if len(node) == 1:
return compiler[node[0]['type']](node[0]['path'] if 'path' in node[0] else parent, node[0])
return functools.reduce(lambda fsm, node: extends(fsm, compile(parent, node)), node, [])
def extends(l, items):
l.extend(items)
return l
fsm = compile('', composition)
conductor = {}
operator = lambda f: conductor.setdefault(f.__name__[1:], f)
@operator
def _choice(p, node, index, inspect, step):
p['s']['state'] = index + (node['then'] if p['params']['value'] else node['else'])
return None
@operator
def _try(p, node, index, inspect, step):
p['s']['stack'].insert(0, { 'catch': index + node['catch'] })
@operator
def _let(p, node, index, inspect, step):
p['s']['stack'].insert(0, { 'let': node['let'] }) # JSON.parse(JSON.stringify(jsonv.let))
@operator
def _exit(p, node, index, inspect, step):
if len(p['s']['stack']) == 0:
return internalError('pop from an empty stack')
p['s']['stack'].pop(0)
@operator
def _action(p, node, index, inspect, step):
return { 'method': 'action', 'action': node['name'], 'params': p['params'], 'state': { '$composer': p['s'] } }
@operator
def _function(p, node, index, inspect, step):
result = None
try:
functionName = node['exec']['functionName'] if 'functionName' in node['exec'] else None
result = run(node['exec']['code'], p, node['exec']['kind'], functionName)
except Exception as error:
result = { 'error': 'Function combinator threw an exception at AST node root'+node['parent']+' (see log for details)' }
if callable(result):
result = { 'error': 'Function combinator evaluated to a function type at AST node root'+node['parent']}
# if a function has only side effects and no return value (or return None), return params
p['params'] = p['params'] if result is None else result
inspect_errors(p)
return step(p)
@operator
def _empty(p, node, index, inspect, step):
inspect_errors(p)
@operator
def _pass(p, node, index, inspect, step):
pass
@operator
def _async(p, node, index, inspect, step):
nonlocal wsk
p['params']['$composer'] = { 'state': p['s']['state'], 'stack': [{ 'marker': True }] + p['s']['stack'] }
p['s']['state'] = index + node['return']
if wsk is None:
wsk = openwhisk({ 'ignore_certs': True })
try:
response = wsk.actions.invoke({ 'name': os.getenv('__OW_ACTION_NAME'), 'params': p['params'] })
result = { 'method': 'async', 'activationId': response['activationId'], 'sessionId': p['s']['session'] }
except Exception as err:
print(err) # invoke failed
result = { 'error': 'Async combinator failed to invoke composition at AST node root'+node['parent']+' (see log for details)' }
p['params'] = result
inspect_errors(p)
return step(p)
def finish(q):
return q['params'] if 'error' in q['params'] else { 'params': q['params'] }
def encodeError(error):
if isinstance(error, str) or not hasattr(error, "__getitem__"):
return {
'code': 500,
'error': error
}
else:
return {
'code': error['code'] if isinstance(error['code'], int) else 500,
'error': error['error'] if isinstance(error['error'], str) else (error['message'] if 'message' in error else 'An internal error occurred')
}
# error status codes
#badRequest = lambda error: { 'code': 400, 'error': error }
internalError = lambda error: encodeError(error)
def inspect_errors(p):
if not isObject(p['params']):
p['params'] = { 'value': p['params'] }
if 'error' in p['params']:
p['params'] = { 'error': p['params']['error'] } # discard all fields but the error field
p['s']['state'] = -1 # abort unless there is a handler in the stack
while len(p['s']['stack']) > 0 and 'marker' not in p['s']['stack'][0]:
first = p['s']['stack'][0]
p['s']['stack'] = p['s']['stack'][1:]
if 'catch' in first:
p['s']['state'] = first['catch']
if p['s']['state'] >= 0:
break
def reduceRight(func, init, seq):
if not seq:
return init
else:
return func(reduceRight(func, init, seq[1:]), seq[0])
def update(dict, dict2):
dict.update(dict2)
return dict
# run function f on current stack
def run(f, p, kind, functionName=None):
# handle let/mask pairs
view = []
n = 0
for frame in p['s']['stack']:
if 'let' in frame and frame['let'] is None:
n += 1
elif 'let' in frame:
if n == 0:
view.append(frame)
else:
n -= 1
# update value of topmost matching symbol on stack if any
def set(symbol, value):
lets = [element for element in view if 'let' in element and symbol in element['let']]
if len(lets) > 0:
element = lets[0]
element['let'][symbol] = value # TODO: JSON.parse(JSON.stringify(value))
# collapse stack for invocation
env = reduceRight(lambda acc, cur: update(acc, cur['let']) if 'let' in cur and isinstance(cur['let'], dict) else acc, {}, view)
if kind == 'python:3':
main = '''exec(code + "\\n__out__['value'] = ''' + functionName + '''(env, args)", {'env': env, 'args': args, '__out__':__out__})'''
code = f
else: # lambda
main = '''__out__['value'] = code(env, args)'''
code = types.FunctionType(marshal.loads(base64.b64decode(bytearray(f, 'ASCII'))), {})
try:
out = {'value': None}
exec(main, {'env': env, 'args': p['params'], 'code': code, '__out__': out})
return out['value']
finally:
for name in env:
set(name, env[name])
def step(p):
# final state, return composition result
if p['s']['state'] < 0 or p['s']['state'] >= len(fsm):
print('Entering final state')
print(json.dumps(p['params']))
return None
# process one state
node = fsm[p['s']['state']] # json definition for current state
if 'path' in node:
print('Entering composition'+node['path'])
index = p['s']['state']
p['s']['state'] = p['s']['state'] + node.get('next', 1)
if not callable(conductor[node['type']]):
return internalError('unexpected '+node['type']+' combinator')
result = conductor[node['type']](p, node, index, inspect, step)
return result if result is not None else step(p)
def invoke(params):
''' do invocation '''
pcomposer = params.get('$composer', {})
if '$composer' in params:
del params['$composer']
pcomposer['session'] = pcomposer.get('session', os.getenv('__OW_ACTIVATION_ID'))
# current state
s = { 'state': 0, 'stack': [], 'resuming': True }
s.update(pcomposer)
p = { 's': s, 'params': params }
if not isinstance(p['s']['state'], int):
return internalError('state parameter is not a number')
if not isinstance(p['s']['stack'], list):
return internalError('stack parameter is not an array')
if 'resuming' in pcomposer:
inspect_errors(p) # handle error objects when resuming
result = None
try:
result = step(p)
except Exception as err:
p['params'] = {'error': internalError(err)}
return result if result is not None else finish(p)
return invoke