runnable-hub/python/example/tool/cli-api-tool.py (31 lines of code) (raw):
#!/usr/bin/env python3
import asyncio
import os
import sys
import yaml
import json
current_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.dirname(os.path.dirname(current_dir)))
from runnable_workers.apiWorker.worker import Worker as ApiWorker
from runnable_workers.processWorker.worker import Worker as ProcessWorker
from runnable_workers.jinjaWorker.worker import Worker as JinjaWorker
from runnable_workers.toolWorker.worker import Worker as ToolWorker
from runnable_workers.toolWorker.request.toolRequest import ToolRequest
from runnable_workers.toolWorker.request.toolDefine import ToolDefine
from runnable_hub import RunnableHub
from runnable_hub.store import RunnableLocalFileStore
requestYaml = """
toolCode: get_domain_ip
toolVersion: v1
inputs:
domain: baidu.com
"""
defineYaml = """
toolCode: get_domain_ip
toolVersion: v1
toolType: API
setting:
outputLoads: JSON
headers:
Accept: application/dns-json
params:
name: "{{ inputs.domain }}"
type: A
url: https://cloudflare-dns.com/dns-query
method: GET
inputSpec:
- name: domain
type: STRING
required: true
outputSpec:
- name: ip
type: STRING
outputsLoads: JSON
outputTemplate: |
{"ip": "{{result.Answer[0].data}}"}
"""
async def main():
runnableHub = RunnableHub(store=RunnableLocalFileStore("/tmp/runnableHub/context"))
toolWorker = ToolWorker(store=RunnableLocalFileStore("/tmp/runnableHub/tool"))
runnableHub.registerWorker(ApiWorker())
runnableHub.registerWorker(ProcessWorker())
runnableHub.registerWorker(JinjaWorker())
runnableHub.registerWorker(toolWorker)
print(runnableHub.workers)
toolWorker.addTool(ToolDefine.model_validate_json(json.dumps(yaml.safe_load(defineYaml))))
runnableContext = await runnableHub.executeStart(ToolRequest.model_validate_json(json.dumps(yaml.safe_load(requestYaml))))
runnableContext = await runnableHub.executeWait(runnableContext)
print(json.dumps(json.loads(runnableContext.model_dump_json()), indent=4))
if __name__ == "__main__":
asyncio.run(main())