runnable-hub/python/example/process/cli-api-python.py (25 lines of code) (raw):
#!/usr/bin/env python3
import asyncio
import os
import sys
import yaml
import json
current_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.dirname(os.path.dirname(current_dir)))
from runnable_workers.processWorker.worker import Worker as ProcessWorker
from runnable_workers.processWorker.request.processRequest import ProcessRequest
from runnable_workers.apiWorker.worker import Worker as ApiWorker
# from runnable_workers.jinjaWorker.worker import Worker as JinjaWorker
from runnable_hub import RunnableHub
from runnable_hub.store import RunnableLocalFileStore
QWEN_SK = os.getenv("QWEN_SK")
requestYaml = f"""
jobs:
api:
steps:
- id: api
api:
url: https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions
method: POST
outputLoads: JSON
headers:
Authorization: Bearer {QWEN_SK}
Content-Type: application/json
payloads:
model: qwen-plus
messages:
- role: system
content: You are a helpful assistant.
- role: user
content: |
请帮我使用python的正则匹配json,返回的python代码请放在
Action:
{{
"action":"python",
"action_input":"..."
}}
这样的格式中
"""
async def main():
runnableHub = RunnableHub(store=RunnableLocalFileStore("/tmp/"))
runnableHub.registerWorker(ProcessWorker())
runnableHub.registerWorker(ApiWorker())
# runnableHub.registerWorker(JinjaWorker())
print(runnableHub.workers)
runnableContext = await runnableHub.executeStart(
ProcessRequest.model_validate_json(json.dumps(yaml.safe_load(requestYaml))))
runnableContext = await runnableHub.executeWait(runnableContext)
print(json.dumps(json.loads(runnableContext.model_dump_json()), indent=4))
if __name__ == "__main__":
asyncio.run(main())