def conductor()

in src/conductor/conductor.py [0:0]


def conductor(composition): # main.
    wsk = None
    isObject = lambda x: isinstance(x, dict)

    # compile AST to FSM
    compiler = {}
    astnode = lambda f: compiler.setdefault(f.__name__[1:], f)

    @astnode
    def _sequence(parent, node):
        fsm = [{ 'parent': parent, 'type': 'pass' }]
        fsm.extend(compile(parent, *node['components']))
        return fsm

    @astnode
    def _action(parent, node):
        return [{ 'parent': parent, 'type': 'action', 'name': node['name'] }]

    @astnode
    def _asynchronous(parent, node):
        body = compile(parent, *node['components'])
        return [{ 'parent': parent, 'type': 'async', 'return': len(body) + 2}, *body, {'parent': parent, 'type': 'stop' }, {'parent': parent, 'type': 'pass' }]

    @astnode
    def _function(parent, node):
        return [{ 'parent': parent, 'type': 'function', 'exec': node['function']['exec'] }]

    @astnode
    def _ensure(parent, node):
        body = compile(parent, node['body'])
        finalizer = compile(parent, node['finalizer'])
        fsm = [{ 'parent': parent, 'type': 'try'}, *body, { 'parent': parent, 'type': 'exit' }, *finalizer]
        fsm[0]['catch'] = len(fsm) - len(finalizer)
        return fsm

    @astnode
    def _let(parent, node):
        body = compile(parent, *node['components'])
        return [{'parent': parent, 'type': 'let', 'let': node['declarations']}, *body, { 'parent': parent, 'type': 'exit' }]

    @astnode
    def _mask(parent, node):
        body = compile(parent, *node['components'])
        return [{'parent': parent, 'type': 'let', 'let': None}, *body, { 'parent': parent, 'type': 'exit' }]

    @astnode
    def _do(parent, node):
        handler = [ *compile(parent, node['handler']), {'parent': parent, 'type': 'pass'}]
        body = compile(parent, node['body'])
        fsm = [{ 'parent': parent, 'type': 'try' }, *body, { 'parent': parent, 'type': 'exit' }, *handler]
        fsm[0]['catch'] = len(fsm) - len(handler)
        fsm[len(fsm) - len(handler) - 1]['next'] = len(handler)
        return fsm

    @astnode
    def _when_nosave(parent, node):
        consequent = compile(parent, node['consequent'])
        alternate = [ *compile(parent, node['alternate']), { parent: 'parent', 'type': 'pass' }]
        fsm = [{ 'parent': parent, 'type': 'pass' },
            *compile(parent, node['test']),
            { 'parent': parent, 'type': 'choice', 'then': 1, 'else': len(consequent) + 1 },
            *consequent,
            *alternate]
        fsm[len(fsm) - len(alternate) - 1]['next'] = len(alternate)
        return fsm

    @astnode
    def _loop_nosave(parent, node):
        body = compile(parent, node['body'])
        test = compile(parent, node['test'])
        fsm = [{ 'parent': parent, 'type': 'pass' }, *test,
            { 'parent': parent, 'type': 'choice', 'then': 1, 'else': len(body) + 1 },
            *body, { parent: 'parent', 'type': 'pass' }]
        fsm[len(fsm) - 2]['next'] = 2 - len(fsm)
        return fsm

    @astnode
    def _doloop_nosave(parent, node):
        body = compile(parent, node['body'])
        test = compile(parent, node['test'])
        fsm = [{ 'parent': parent, 'type': 'pass' }, *body, *test,
               { 'parent': parent, 'type': 'choice', 'else': 1}, { parent: 'parent', 'type': 'pass' }]
        fsm[len(fsm) - 2]['then'] = 2 - len(fsm)
        return fsm

    def compile(parent, *node):
        nonlocal compiler
        if len(node) == 0:
            return [{'parent': parent, 'type': 'empty'}]
        if len(node) == 1:
            return compiler[node[0]['type']](node[0]['path'] if 'path' in node[0] else parent, node[0])
        return functools.reduce(lambda fsm, node: extends(fsm, compile(parent, node)), node, [])


    def extends(l, items):
        l.extend(items)
        return l

    fsm = compile('', composition)

    conductor = {}
    operator = lambda f: conductor.setdefault(f.__name__[1:], f)

    @operator
    def _choice(p, node, index, inspect, step):
        p['s']['state'] = index + (node['then'] if p['params']['value'] else node['else'])
        return None

    @operator
    def _try(p, node, index, inspect, step):
        p['s']['stack'].insert(0, { 'catch': index + node['catch'] })

    @operator
    def _let(p, node, index, inspect, step):
        p['s']['stack'].insert(0, { 'let': node['let'] }) # JSON.parse(JSON.stringify(jsonv.let))

    @operator
    def _exit(p, node, index, inspect, step):
        if len(p['s']['stack']) == 0:
            return internalError('pop from an empty stack')
        p['s']['stack'].pop(0)

    @operator
    def _action(p, node, index, inspect, step):
        return { 'method': 'action', 'action': node['name'], 'params': p['params'], 'state': { '$composer': p['s'] } }

    @operator
    def _function(p, node, index, inspect, step):
        result = None
        try:
            functionName = node['exec']['functionName'] if 'functionName' in node['exec'] else None
            result = run(node['exec']['code'], p, node['exec']['kind'], functionName)
        except Exception as error:
            result = { 'error': 'Function combinator threw an exception at AST node root'+node['parent']+' (see log for details)' }

        if callable(result):
            result = { 'error': 'Function combinator evaluated to a function type at AST node root'+node['parent']}

        # if a function has only side effects and no return value (or return None), return params
        p['params'] = p['params'] if result is None else result
        inspect_errors(p)
        return step(p)

    @operator
    def _empty(p, node, index, inspect, step):
        inspect_errors(p)

    @operator
    def _pass(p, node, index, inspect, step):
        pass

    @operator
    def _async(p, node, index, inspect, step):
        nonlocal wsk

        p['params']['$composer'] = { 'state': p['s']['state'], 'stack': [{ 'marker': True }] + p['s']['stack'] }
        p['s']['state'] = index + node['return']
        if wsk is None:
            wsk = openwhisk({ 'ignore_certs': True })
        try:
            response = wsk.actions.invoke({ 'name': os.getenv('__OW_ACTION_NAME'), 'params': p['params'] })
            result = { 'method': 'async', 'activationId': response['activationId'], 'sessionId': p['s']['session'] }

        except Exception as err:
            print(err) # invoke failed
            result = { 'error': 'Async combinator failed to invoke composition at AST node root'+node['parent']+' (see log for details)' }

        p['params'] = result
        inspect_errors(p)
        return step(p)

    def finish(q):
        return q['params'] if 'error' in q['params'] else { 'params': q['params'] }

    def encodeError(error):
        if isinstance(error, str) or not hasattr(error, "__getitem__"):
            return {
                'code': 500,
                'error': error
            }
        else:
            return {
                'code': error['code'] if isinstance(error['code'], int) else 500,
                'error': error['error'] if isinstance(error['error'], str) else (error['message'] if 'message' in error else 'An internal error occurred')
            }

    # error status codes
    #badRequest = lambda error: { 'code': 400, 'error': error }
    internalError = lambda error: encodeError(error)

    def inspect_errors(p):
        if not isObject(p['params']):
            p['params'] = { 'value': p['params'] }
        if 'error' in p['params']:
            p['params'] = { 'error': p['params']['error'] } # discard all fields but the error field
            p['s']['state'] = -1 # abort unless there is a handler in the stack
            while len(p['s']['stack']) > 0 and 'marker' not in p['s']['stack'][0]:
                first = p['s']['stack'][0]
                p['s']['stack'] = p['s']['stack'][1:]
                if 'catch' in first:
                    p['s']['state'] = first['catch']
                    if p['s']['state'] >= 0:
                        break

    def reduceRight(func, init, seq):
        if not seq:
            return init
        else:
            return func(reduceRight(func, init, seq[1:]), seq[0])

    def update(dict, dict2):
        dict.update(dict2)
        return dict

    # run function f on current stack
    def run(f, p, kind, functionName=None):
        # handle let/mask pairs
        view = []
        n = 0
        for frame in p['s']['stack']:
            if 'let' in frame and frame['let'] is None:
                n += 1
            elif 'let' in frame:
                if n == 0:
                    view.append(frame)
                else:
                    n -= 1
        # update value of topmost matching symbol on stack if any
        def set(symbol, value):
            lets = [element for element in view if 'let' in element and symbol in element['let']]
            if len(lets) > 0:
                element = lets[0]
                element['let'][symbol] = value # TODO: JSON.parse(JSON.stringify(value))

        # collapse stack for invocation
        env = reduceRight(lambda acc, cur: update(acc, cur['let']) if 'let' in cur and isinstance(cur['let'], dict) else acc, {}, view)
        if kind == 'python:3':
            main = '''exec(code + "\\n__out__['value'] = ''' + functionName + '''(env, args)", {'env': env, 'args': args, '__out__':__out__})'''
            code = f
        else: # lambda
            main = '''__out__['value'] = code(env, args)'''
            code = types.FunctionType(marshal.loads(base64.b64decode(bytearray(f, 'ASCII'))), {})

        try:
            out = {'value': None}
            exec(main, {'env': env, 'args': p['params'], 'code': code, '__out__': out})
            return out['value']
        finally:
            for name in env:
                set(name, env[name])

    def step(p):
        # final state, return composition result
        if p['s']['state'] < 0 or p['s']['state'] >= len(fsm):
            print('Entering final state')
            print(json.dumps(p['params']))
            return None

        # process one state
        node = fsm[p['s']['state']] # json definition for current state
        if 'path' in node:
            print('Entering composition'+node['path'])
        index = p['s']['state']
        p['s']['state'] = p['s']['state'] + node.get('next', 1)
        if not callable(conductor[node['type']]):
            return internalError('unexpected '+node['type']+' combinator')

        result = conductor[node['type']](p, node, index, inspect, step)
        return result if result is not None else step(p)


    def invoke(params):
        ''' do invocation '''
        pcomposer = params.get('$composer', {})
        if '$composer' in params:
            del params['$composer']
        pcomposer['session'] = pcomposer.get('session', os.getenv('__OW_ACTIVATION_ID'))

        # current state
        s = { 'state': 0, 'stack': [], 'resuming': True }
        s.update(pcomposer)
        p = { 's': s, 'params': params }

        if not isinstance(p['s']['state'], int):
            return internalError('state parameter is not a number')
        if not isinstance(p['s']['stack'], list):
            return internalError('stack parameter is not an array')

        if 'resuming' in pcomposer:
            inspect_errors(p) # handle error objects when resuming

        result = None
        try:
            result = step(p)
        except Exception as err:
            p['params'] = {'error': internalError(err)}

        return result if result is not None else finish(p)

    return invoke