runnable-hub/python/example/chain/cli-simple.py (39 lines of code) (raw):

#!/usr/bin/env python3 import asyncio import os import sys import yaml import json current_dir = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, os.path.dirname(os.path.dirname(current_dir))) from runnable_workers.chainWorker.worker import Worker as ChainWorker from runnable_workers.toolWorker.worker import Worker as ToolWorker from runnable_workers.pythonWorker.worker import Worker as PythonWorker from runnable_workers.llmWorker.worker import Worker as LlmWorker from runnable_workers.processWorker.worker import Worker as ProcessWorker from runnable_workers.jinjaWorker.worker import Worker as JinjaWorker from runnable_workers.apiWorker.worker import Worker as ApiWorker from runnable_workers.chainWorker.request.chainRequest import ChainRequest from runnable_workers.toolWorker.request.toolDefine import ToolDefine from runnable_hub import RunnableHub from runnable_hub.store import RunnableLocalFileStore QWEN_SK = os.getenv("QWEN_SK") toolDefineYaml = """ toolCode: get_domain_ip toolVersion: v1 toolType: API setting: outputLoads: JSON headers: Accept: application/dns-json params: name: "{{ inputs.domain }}" type: A url: https://cloudflare-dns.com/dns-query method: GET inputSpec: - name: domain type: STRING required: true outputSpec: - name: ip type: STRING outputsLoads: JSON outputTemplate: | {"ip": "{{result.Answer[0].data}}"} """ requestYaml = f""" llm: endpoint: https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions model: qwen-plus secretKey: {QWEN_SK} """ requestYaml += """ data: inputs: prompt: What is the IP address of baidu.com? functions: - type: TOOL name: get_domain_ip version: v1 description: | A tool that can fetch the IP address of a domain. inputDefine: - name: domain type: STRING required: true systemPrompt: | Respond to the human as helpfully and accurately as possible. You have access to the following tools: {{ tool_info }} Use a json blob to specify a tool by providing an action key (tool name) and an action_input key (tool input). Valid "action" values: "Final Answer" or ssh Provide only ONE action per $JSON_BLOB, as shown: ``` { "action": $TOOL_NAME, "action_input": $INPUT } ``` Follow this format: Question: input question to answer Thought: consider previous and subsequent steps Action: ``` $JSON_BLOB ``` Observation: action result ... (repeat Thought/Action/Observation N times) Thought: I know what to respond Action: ``` { "action": "Final Answer", "action_input": "Final response to human" } ``` userPrompt: | Begin! Reminder to ALWAYS respond with a valid json blob of a single action. Use tools if necessary. Respond directly if appropriate. Format is Action:```$JSON_BLOB```then Observation:. Thought: Human: {{ inputs.prompt }} onNext: | import re import json from pydantic import BaseModel from typing import Optional, Dict class FunctionCall(BaseModel): name: str input: Dict|str class ChainDo(BaseModel): message: Optional[str] = None # Message to be replied function: Optional[FunctionCall] = None # Function call to be executed finalAnswer: Optional[str] = None # Final answer to be returned overwriteMessage: Optional[str] = None # Message to be overwritten class ChainContext(BaseModel): message: str function: Optional[str] do: ChainDo = ChainDo() def onNext(context: ChainContext): if context.function is None: pattern = re.compile(r'Action:\s*```(.*?)```', re.DOTALL | re.IGNORECASE) match = pattern.search(context.message) if match: json_str = match.group(1).strip() # 提取内容并去除首尾空白 try: action_data = json.loads(json_str) if "final answer" in action_data.get("action","").lower(): context.do.finalAnswer = action_data["action_input"] elif action_data.get("action_input") is not None: context.do.function = FunctionCall( name=action_data["action"], input=action_data["action_input"]) except json.JSONDecodeError: raise Exception("json parse fail, content:" + json_str) else: raise Exception("No action found") else: context.do.message = "Observation: " + context.function return context """ async def main(): toolWorker = ToolWorker(store=RunnableLocalFileStore("/tmp/tool")) runnableHub = RunnableHub(store=RunnableLocalFileStore("/tmp/")) runnableHub.registerWorker(ChainWorker()) runnableHub.registerWorker(toolWorker) runnableHub.registerWorker(PythonWorker()) runnableHub.registerWorker(LlmWorker()) runnableHub.registerWorker(ProcessWorker()) runnableHub.registerWorker(JinjaWorker()) runnableHub.registerWorker(ApiWorker()) print(runnableHub.workers) toolWorker.addTool(ToolDefine.model_validate_json(json.dumps(yaml.safe_load(toolDefineYaml)))) runnableContext = await runnableHub.executeStart(ChainRequest.model_validate_json(json.dumps(yaml.safe_load(requestYaml)))) runnableContext = await runnableHub.executeWait(runnableContext) print(json.dumps(json.loads(runnableContext.model_dump_json()), indent=4)) if __name__ == "__main__": asyncio.run(main())