runnable-hub/python/runnable_workers/agentWorker/worker.py (162 lines of code) (raw):

from runnable_hub import RunnableWorker, RunnableContext, RunnableStatus from runnable_hub.interface import RunnableFileStore, RunnableDatabaseStore from .request.agentRequest import AgentRequest from .request.agentDefine import AgentDefine from .request.agentChainTemplate import AgentChainTemplate from .request.agentFunction import AgentFunctionType from ..llmWorker.request.llmSetting import LlmSetting from ..toolWorker.worker import Worker as ToolWorker from .response import AgentResponse from typing import Dict from datetime import datetime import json # inputs -> prerun -> outputs # inputs -> chain -> outputs # inputs -> postrun -> outputs class Worker(RunnableWorker): runnableCode = "AGENT" Request = AgentRequest Response = AgentResponse def __init__(self, store: RunnableFileStore|RunnableDatabaseStore, toolWorker: ToolWorker): self.store = store self.toolWorker = toolWorker def addAgent(self, agentDefine: AgentDefine): if isinstance(self.store, RunnableFileStore): filePath = f"{agentDefine.agentCode}/{agentDefine.agentVersion}/define.json" self.store.saveFile(filePath, agentDefine.model_dump_json()) else: raise Exception("Not supported") def addChainTemplate(self, chainTemplateCode: str, template: AgentChainTemplate): if isinstance(self.store, RunnableFileStore): filePath = f"chainTemplates/{chainTemplateCode}/template.json" self.store.saveFile(filePath, template.model_dump_json()) else: raise Exception("Not supported") def addLlm(self, llmCode: str, llmSetting: LlmSetting): if isinstance(self.store, RunnableFileStore): filePath = f"llm/{llmCode}.json" self.store.saveFile(filePath, llmSetting.model_dump_json()) else: raise Exception("Not supported") def readAgent(self, agentCode:str, agentVersion:str) -> AgentDefine: if isinstance(self.store, RunnableFileStore): filePath = f"{agentCode}/{agentVersion}/define.json" return AgentDefine.model_validate_json(self.store.readFile(filePath)) else: raise Exception("Not supported") def readChainTemplate(self, chainTemplateCode: str) -> AgentChainTemplate: if isinstance(self.store, RunnableFileStore): filePath = f"chainTemplates/{chainTemplateCode}/template.json" return AgentChainTemplate.model_validate_json(self.store.readFile(filePath)) else: raise Exception("Not supported") def readLllm(self, llmCode: str) -> LlmSetting: if isinstance(self.store, RunnableFileStore): filePath = f"llm/{llmCode}.json" return LlmSetting.model_validate_json(self.store.readFile(filePath)) else: raise Exception("Not supported") async def onNext(self, context: RunnableContext[AgentRequest, AgentResponse]) -> RunnableContext: agentDefine = self.readAgent(context.request.agentCode, context.request.agentVersion) if context.data.get("inputs") is None: context.data["inputs"] = context.request.inputs context.data["outputs"] = {} context.data["stage"] = [] if agentDefine.prerun is not None: context.data["stage"].append("prerun") if agentDefine.chainTemplate is not None or agentDefine.chainTemplateCode is not None: context.data["stage"].append("chain") if agentDefine.postrun is not None: context.data["stage"].append("postrun") # prerun if len(context.data["stage"]) > 0 and context.data["stage"][0] == "prerun" and agentDefine.prerun is not None: if context.data.get("sendPrerunRequest") is None: prerunRequest = { "runnableCode": "PROCESS", "inputs": context.request.inputs, "jobs": agentDefine.prerun.get("jobs", {}), } if agentDefine.prerun.get("chainInputs") is not None: prerunRequest["outputs"] = agentDefine.prerun.get("chainInputs") elif agentDefine.prerun.get("outputs") is not None: prerunRequest["outputs"] = agentDefine.prerun.get("outputs") context.promise.resolve["prerun"] = prerunRequest context.data["sendPrerunRequest"] = datetime.now() return context else: if context.promise.result.get("prerun") is None: raise ValueError("prerun response is missing") # 将prerun的执行outputs更新到inputs或outputs if agentDefine.prerun.get("chainInputs") is not None: context.data["inputs"].update(context.promise.result["prerun"]["outputs"]) elif agentDefine.prerun.get("outputs") is not None: context.data["outputs"] = context.promise.result["prerun"]["outputs"] context.data["stage"].remove("prerun") # chain if len(context.data["stage"]) > 0 and context.data["stage"][0] == "chain" \ and (agentDefine.chainTemplate is not None or agentDefine.chainTemplateCode is not None): if context.data.get("sendChainRequest") is None: if agentDefine.chainTemplate is not None: chainTemplate = { "systemPrompt": agentDefine.chainTemplate.systemPrompt, "userPrompt": agentDefine.chainTemplate.userPrompt, "onNext": agentDefine.chainTemplate.onNext } elif agentDefine.chainTemplateCode is not None: chainTemplateByCode = self.readChainTemplate(agentDefine.chainTemplateCode) chainTemplate = { "systemPrompt": chainTemplateByCode.systemPrompt, "userPrompt": chainTemplateByCode.userPrompt, "onNext": chainTemplateByCode.onNext } if agentDefine.llm is not None: llm = agentDefine.llm elif agentDefine.llmCode is not None: llm = self.readLllm(agentDefine.llmCode) else: raise Exception("No llm") chainFunctions = [] toolCodeVersions = [f"{f.name}:{f.version}" for f in agentDefine.functions if f.type == AgentFunctionType.TOOL] toolMap = {tool.toolCode: tool for tool in await self.toolWorker.readTools(toolCodeVersions)} for f in agentDefine.functions: if f.type == AgentFunctionType.TOOL: chainFunctions.append({ "type": "TOOL", "name": f.name, "version": f.version, "description": toolMap[f.name].description, "inputDefine": toolMap[f.name].inputSpec, "presetInputs": f.presetInputs, }) chainRequest = { "runnableCode": "CHAIN", "data": { "inputs": context.data["inputs"], }, "llm": llm, "functions": chainFunctions, } chainRequest.update(chainTemplate) context.promise.resolve["chain"] = chainRequest context.data["sendChainRequest"] = datetime.now() return context else: if context.promise.result["chain"] is None: raise ValueError("chain response is missing") if agentDefine.postrun is None: context.data["outputs"] = context.promise.result["chain"]["finalAnswer"] else: context.data["inputs"]["chainAnswer"] = context.promise.result["chain"]["finalAnswer"] context.data["stage"].remove("chain") # postrun if len(context.data["stage"]) > 0 and context.data["stage"][0] == "postrun" and agentDefine.postrun is not None: if context.data.get("sendPostrunRequest") is None: prerunRequest = { "runnableCode": "PROCESS", "inputs": context.data["inputs"], "jobs": agentDefine.postrun.get("jobs", {}), "outputs": agentDefine.postrun.get("outputs"), } context.promise.resolve["postrun"] = prerunRequest context.data["sendPostrunRequest"] = datetime.now() return context else: if context.promise.result["postrun"] is None: raise ValueError("postrun response is missing") context.data["outputs"] = context.promise.result["postrun"]["outputs"] context.status = RunnableStatus.SUCCESS context.response = AgentResponse(success=True, outputs=context.data["outputs"]) return context