runnable-hub/python/example/process/cli-shell-io.py (24 lines of code) (raw):

#!/usr/bin/env python3 import asyncio import os import sys import yaml import json current_dir = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, os.path.dirname(os.path.dirname(current_dir))) from runnable_workers.processWorker.worker import Worker as ProcessWorker from runnable_workers.processWorker.request.processRequest import ProcessRequest from runnable_workers.shellWorker.worker import Worker as ShellWorker from runnable_hub import RunnableHub from runnable_hub.store import RunnableLocalFileStore requestYaml = """ inputs: test_var: test jobs: test: steps: - id: testStep1 shell: | echo "set output abc = 'hello world ${{ inputs.test_var }}' " echo "hello world" > ${{ outputs.abc.path }} - id: testStep2 shell: | echo "hhh" echo ${{ steps.testStep1.outputs.abc }} """ async def main(): runnableHub = RunnableHub(store=RunnableLocalFileStore("/tmp/")) runnableHub.registerWorker(ProcessWorker()) runnableHub.registerWorker(ShellWorker()) print(runnableHub.workers) runnableContext = await runnableHub.executeStart( ProcessRequest.model_validate_json(json.dumps(yaml.safe_load(requestYaml)))) runnableContext = await runnableHub.executeWait(runnableContext) print(json.dumps(json.loads(runnableContext.model_dump_json()), indent=4)) if __name__ == "__main__": asyncio.run(main())