runnable-hub/python/runnable_workers/toolWorker/worker.py (81 lines of code) (raw):
from runnable_hub import RunnableWorker, RunnableContext, RunnableStatus
from runnable_hub.interface import RunnableFileStore, RunnableDatabaseStore
from .request.toolRequest import ToolRequest
from .request.toolDefine import ToolDefine, ToolType
from .response import ToolResponse
from typing import Dict, List
from datetime import datetime
import json
class Worker(RunnableWorker):
runnableCode = "TOOL"
Request = ToolRequest
Response = ToolResponse
def __init__(self, store: RunnableFileStore|RunnableDatabaseStore):
self.store = store
def addTool(self, toolDefine: ToolDefine):
if isinstance(self.store, RunnableFileStore):
filePath = f"{toolDefine.toolCode}/{toolDefine.toolVersion}/define.json"
self.store.saveFile(filePath, toolDefine.model_dump_json())
else:
raise Exception("Not supported")
def readTool(self, toolCode:str, toolVersion:str) -> ToolDefine:
if isinstance(self.store, RunnableFileStore):
filePath = f"{toolCode}/{toolVersion}/define.json"
return ToolDefine.model_validate_json(self.store.readFile(filePath))
else:
raise Exception("Not supported")
# example toolCodeVersions
# ["get_domain_ip:v1"]
async def readTools(self, toolCodeVersions: List[str]) -> List[ToolDefine]:
if isinstance(self.store, RunnableFileStore):
return [self.readTool(toolCodeVersion.split(":")[0], toolCodeVersion.split(":")[1]) for toolCodeVersion in toolCodeVersions]
else:
raise Exception("Not supported")
async def onNext(self, context: RunnableContext[ToolRequest, ToolResponse]) -> RunnableContext:
toolDefine = self.readTool(context.request.toolCode, context.request.toolVersion)
if context.data.get("sendProcessRequest") is None:
if toolDefine.toolType == ToolType.API:
processStepName = "api"
else:
raise Exception("Not supported")
processRequest = {
"runnableCode": "PROCESS",
"outputs": "${{ jobs.call.outputs }}",
"jobs":{
"call": {
"outputs": "${{ steps.result.outputs }}",
"steps":[{
"id": "setting",
"jinja":{
"data": {
"inputs": context.request.inputs
},
"outputLoads": "JSON",
"template": json.dumps(toolDefine.setting),
}
},{
"id": "call",
processStepName: "${{ steps.setting.outputs }}"
},{
"id": "result",
"jinja":{
"data": {
"result": "${{ steps.call.outputs }}"
},
"outputLoads": toolDefine.outputsLoads,
"template": toolDefine.outputTemplate
}
}]
}
}
}
context.promise.resolve["todo"] = processRequest
context.data["sendProcessRequest"] = datetime.now()
else:
if context.promise.result.get("todo") is None:
raise Exception("Process request not finish")
result = context.promise.result["todo"]
context.response = ToolResponse(
success=True,
outputs=result["outputs"]
)
context.status = RunnableStatus.SUCCESS
return context