runnable-hub/python/example/agent/cli-full-agent.py (54 lines of code) (raw):
#!/usr/bin/env python3
import asyncio
from json import tool
import os
import sys
from urllib import request
import yaml
import json
current_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.dirname(os.path.dirname(current_dir)))
from runnable_workers.agentWorker.worker import Worker as AgentWorker
from runnable_workers.processWorker.worker import Worker as ProcessWorker
from runnable_workers.toolWorker.worker import Worker as ToolWorker
from runnable_workers.pythonWorker.worker import Worker as PythonWorker
from runnable_workers.llmWorker.worker import Worker as LlmWorker
from runnable_workers.processWorker.worker import Worker as ProcessWorker
from runnable_workers.jinjaWorker.worker import Worker as JinjaWorker
from runnable_workers.apiWorker.worker import Worker as ApiWorker
from runnable_workers.chainWorker.worker import Worker as ChainWorker
from runnable_workers.agentWorker.request.agentRequest import AgentRequest
from runnable_workers.agentWorker.request.agentChainTemplate import AgentChainTemplate
from runnable_workers.llmWorker.request.llmSetting import LlmSetting
from runnable_workers.agentWorker.request.agentDefine import AgentDefine
from runnable_workers.toolWorker.request.toolDefine import ToolDefine
from runnable_hub import RunnableHub
from runnable_hub.store import RunnableLocalFileStore
QWEN_SK = os.getenv("QWEN_SK")
defineToolYaml = """
toolCode: get_domain_ip
toolVersion: v1
toolType: API
description: |
This tool is used to get the IP address of a domain.
setting:
outputLoads: JSON
headers:
Accept: application/dns-json
params:
name: "{{ inputs.domain }}"
type: A
url: https://cloudflare-dns.com/dns-query
method: GET
inputSpec:
- name: domain
type: STRING
required: true
outputSpec:
- name: ip
type: STRING
outputsLoads: JSON
outputTemplate: |
{"ip": "{{result.Answer[0].data}}"}
"""
definellmYaml = f"""
endpoint: https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions
model: qwen-plus
secretKey: {QWEN_SK}
"""
defineChainTemplateYaml = """
systemPrompt: |
Respond to the human as helpfully and accurately as possible. You have access to the following tools:
{{ tool_info }}
Use a json blob to specify a tool by providing an action key (tool name) and an action_input key (tool input).
Valid "action" values: "Final Answer" or ssh
Provide only ONE action per $JSON_BLOB, as shown:
```
{
"action": $TOOL_NAME,
"action_input": $INPUT
}
```
Follow this format:
Question: input question to answer
Thought: consider previous and subsequent steps
Action:
```
$JSON_BLOB
```
Observation: action result
... (repeat Thought/Action/Observation N times)
Thought: I know what to respond
Action:
```
{
"action": "Final Answer",
"action_input": "Final response to human"
}
```
userPrompt: |
Begin! Reminder to ALWAYS respond with a valid json blob of a single action. Use tools if necessary. Respond directly if appropriate. Format is Action:```$JSON_BLOB```then Observation:.
Thought:
Human: {{ inputs.prompt }}
onNext: |
import re
import json
from pydantic import BaseModel
from typing import Optional, Dict
class FunctionCall(BaseModel):
name: str
input: Dict|str
class ChainDo(BaseModel):
message: Optional[str] = None # Message to be replied
function: Optional[FunctionCall] = None # Function call to be executed
finalAnswer: Optional[str] = None # Final answer to be returned
overwriteMessage: Optional[str] = None # Message to be overwritten
class ChainContext(BaseModel):
message: str
function: Optional[str]
do: ChainDo = ChainDo()
def onNext(context: ChainContext):
if context.function is None:
pattern = re.compile(r'Action:\s*```(.*?)```', re.DOTALL | re.IGNORECASE)
match = pattern.search(context.message)
if match:
json_str = match.group(1).strip() # 提取内容并去除首尾空白
try:
action_data = json.loads(json_str)
if "final answer" in action_data.get("action","").lower():
context.do.finalAnswer = action_data["action_input"]
elif action_data.get("action_input") is not None:
context.do.function = FunctionCall(
name=action_data["action"],
input=action_data["action_input"])
except json.JSONDecodeError:
raise Exception("json parse fail, content:" + json_str)
else:
raise Exception("No action found")
else:
context.do.message = "Observation: " + context.function
return context
"""
defineAgentYaml = """
agentCode: domain_checker
agentVersion: v1
inputDefine:
- name: prompt
type: STRING
required: true
functions:
- type: TOOL
name: get_domain_ip
version: v1
prerun:
chainInputs:
prompt: ${{ jobs.replace.outputs }}
jobs:
replace:
outputs: ${{ steps.replace_text.outputs.prompt }}
steps:
- id: replace_text
python: |
import os
h = open(${{ outputs.prompt.path }}, "w")
h.write('${{ inputs.prompt }}'.replace('**', 'baidu.com'))
h.close()
postrun:
outputs: ${{ jobs.notice.outputs }}
jobs:
notice:
outputs: ${{ steps.notice_step.outputs.result }}
steps:
- id: notice_step
python: |
import os
h = open(${{ outputs.result.path }}, "w")
h.write('${{ inputs.chainAnswer }} this is postrun ')
h.close()
instruction: |
你是一个检查域名的智能体,能够告诉用户一个域名对应的IP地址。
chainTemplateCode: default-chain
llmCode: qwen-test
"""
requestYaml = """
agentCode: domain_checker
agentVersion: v1
inputs:
prompt: "What is the IP address of **?"
"""
async def main():
runnableHub = RunnableHub(store=RunnableLocalFileStore("/tmp/runnableHub/context"))
toolWorker = ToolWorker(store=RunnableLocalFileStore("/tmp/runnableHub/tool"))
agentWorker = AgentWorker(store=RunnableLocalFileStore("/tmp/runnableHub/agent"), toolWorker=toolWorker)
toolWorker.addTool(ToolDefine.model_validate_json(json.dumps(yaml.safe_load(defineToolYaml))))
agentWorker.addAgent(AgentDefine.model_validate_json(json.dumps(yaml.safe_load(defineAgentYaml))))
agentWorker.addChainTemplate("default-chain", AgentChainTemplate.model_validate_json(json.dumps(yaml.safe_load(defineChainTemplateYaml))))
agentWorker.addLlm("qwen-test", LlmSetting.model_validate_json(json.dumps(yaml.safe_load(definellmYaml))))
runnableHub.registerWorker(agentWorker)
runnableHub.registerWorker(toolWorker)
runnableHub.registerWorker(ProcessWorker())
runnableHub.registerWorker(ChainWorker())
runnableHub.registerWorker(PythonWorker())
runnableHub.registerWorker(LlmWorker())
runnableHub.registerWorker(ProcessWorker())
runnableHub.registerWorker(JinjaWorker())
runnableHub.registerWorker(ApiWorker())
print(runnableHub.workers)
runnableContext = await runnableHub.executeStart(AgentRequest.model_validate_json(json.dumps(yaml.safe_load(requestYaml))))
runnableContext = await runnableHub.executeWait(runnableContext)
print(json.dumps(json.loads(runnableContext.model_dump_json()), indent=4))
if __name__ == "__main__":
asyncio.run(main())