runnable-hub/python/runnable_workers/processWorker/worker.py (156 lines of code) (raw):
from datetime import datetime
from typing import Dict, Any, List
from runnable_hub import RunnableWorker, RunnableContext, RunnableStatus
from .request.processRequest import ProcessRequest
from .request.processStep import ProcessStep
from .response import ProcessResponse
from jinja2 import Environment
from uuid import uuid4
import json
class OutputRender:
def __init__(self, pathValueTemplate):
self.outputValueMap = {}
self.pathValueTemplate = pathValueTemplate
def __getattr__(self, name):
self.outputValueMap[name] = str(uuid4())[:8]+".txt"
return {"path": self.pathValueTemplate.format(outputFileName=self.outputValueMap[name])}
def save_filter(value, target_dict):
target_dict["save"] = value
return value
class Worker(RunnableWorker):
runnableCode = "PROCESS"
Request = ProcessRequest
Response = ProcessResponse
def __init__(self):
self.jinjaNewEnv = Environment(
variable_start_string="${{", # 使用 ${{ 作为变量开始标记
variable_end_string="}}", # 使用 }} 作为变量结束标记
)
self.jinjaNewEnv.filters["save"] = save_filter
def complexRender(self, data: Dict, target):
if isinstance(target, dict):
return {self.complexRender(data, key): self.complexRender(data, value) for key, value in target.items()}
elif isinstance(target, list):
return [self.complexRender(data, value) for value in target]
elif isinstance(target, str):
target = target.strip()
# 如果整个字符串只有 ${{ ... }} 则直接取里面的内容,保留变量类型
if target.startswith("${{") and target.endswith("}}"):
fetchValue = {}
self.jinjaNewEnv.from_string(target[:-2] + "|save(fetchValue)}}").render(fetchValue=fetchValue, **data)
return fetchValue["save"]
else:
return self.jinjaNewEnv.from_string(target).render(**data)
else:
return target
def getAllJobsOutputs(self, runtime) -> Dict:
jobsOutputs = {}
for jobId, job in runtime.items():
if job["jobStatus"] == "SUCCESS":
jobsOutputs[jobId] = {
"outputs": job["outputs"]
}
return jobsOutputs
def make_worker_request(self, step, stepOutputs, inputs, settings, jobsOutputs) -> Dict:
renderData = {"steps": stepOutputs, "inputs": inputs, "settings": settings, "jobs": jobsOutputs}
outputs = None
if step.get("shell") is not None:
outputs = OutputRender('$SHELL_RUN_PATH/{outputFileName}')
elif step.get("python") is not None:
outputs = OutputRender("os.environ.get('PYTHON_RUN_PATH')+'/{outputFileName}'")
if outputs is not None:
renderData["outputs"] = outputs
stepRender:Any = self.complexRender(renderData, step)
if step.get("runnableCode") is not None:
runnableCode = step["runnableCode"]
elif step.get("shell") is not None:
runnableCode = "SHELL"
elif step.get("api") is not None:
runnableCode = "API"
elif step.get("jinja") is not None:
runnableCode = "JINJA"
elif step.get("python") is not None:
runnableCode = "PYTHON"
elif step.get("tool") is not None:
runnableCode = "TOOL"
elif step.get("agent") is not None:
runnableCode = "AGENT"
else:
raise RuntimeError(f"step {step} has no runnableCode")
if runnableCode in ["SHELL", "PYTHON"]:
stepRender["request"] = {
"runnableCode": runnableCode,
"run": stepRender[runnableCode.lower()],
}
else:
stepRender["request"] = stepRender[runnableCode.lower()]
if outputs is not None:
stepRender["request"]["outputs"] = outputs.outputValueMap
stepRender["request"]["runnableCode"] = runnableCode
return stepRender
async def onNext(self, context: RunnableContext[ProcessRequest, ProcessResponse]) -> RunnableContext:
if context.data.get("runtime") is None:
context.data["runtime"] = {}
for jobId, job in context.request.jobs.items():
if len(job.steps) == 0:
raise RuntimeError(f"jobId {jobId} steps is empty")
context.data["runtime"][jobId] = {
"needs": job.needs[:],
"steps": [step.model_dump() for step in job.steps],
"stepOutputs": {},
"errors": {},
"currentStepId": None,
"startTime": datetime.now(),
"outputs": job.outputs,
}
if len(job.needs) == 0:
context.data["runtime"][jobId]["jobStatus"] = "RUNNING"
context.data["runtime"][jobId]["stepStatus"] = "RUNNING"
else:
context.data["runtime"][jobId]["jobStatus"] = "PENDING"
context.data["runtime"][jobId]["stepStatus"] = "PENDING"
for jobId,job in context.data["runtime"].items():
if job["jobStatus"] != "RUNNING":
continue
if context.promise.result.get(jobId) is not None:
job["stepOutputs"][job["currentStepId"]] = context.promise.result[jobId]
job["currentStepId"] = None
job["stepStatus"] = "SUCESS"
elif context.promise.reject.get(jobId) is not None:
job["errors"][job["currentStepId"]] = context.promise.reject[jobId]
job["currentStepId"] = None
job["stepStatus"] = "ERROR"
if job["currentStepId"] is not None: # job not finish
continue
if job["stepStatus"] == "ERROR":
job["jobStatus"] = "ERROR"
job["endTime"] = datetime.now()
for checkJob in context.data["runtime"].values():
checkJob["jobStatus"] = "ERROR"
checkJob["endTime"] = datetime.now()
elif len(job["steps"]) > 0:
self.getAllJobsOutputs(context.data["runtime"])
step = self.make_worker_request(job["steps"].pop(0), job["stepOutputs"], context.request.inputs,
context.request.settings, self.getAllJobsOutputs(context.data["runtime"]))
job["currentStepId"] = step["id"]
context.promise.resolve[jobId] = step["request"]
else:
job["jobStatus"] = "SUCCESS"
job["endTime"] = datetime.now()
if job["outputs"] is not None:
job["outputs"] = self.complexRender({"steps": job["stepOutputs"], "inputs": context.request.inputs}, job["outputs"])
for checkJob in context.data["runtime"].values():
if jobId in checkJob["needs"]:
checkJob["needs"].remove(jobId)
if len(checkJob["needs"]) == 0 and checkJob["jobStatus"] == "PENDING":
checkJob["jobStatus"] = "RUNNING"
checkJob["stepStatus"] = "RUNNING"
allStatus = set([r["jobStatus"] for r in context.data["runtime"].values()])
if allStatus == set(["SUCCESS"]):
context.status = RunnableStatus.SUCCESS
if context.request.outputs is not None:
jobOutputs = {jobId: {"outputs":job["outputs"]} for jobId, job in context.data["runtime"].items()}
processOutputs = self.complexRender({"jobs": jobOutputs, "inputs": context.request.inputs}, context.request.outputs)
context.response = ProcessResponse(outputs=processOutputs) # type: ignore
elif allStatus == set(["ERROR"]) or allStatus == set(["ERROR","SUCCESS"]):
context.status = RunnableStatus.ERROR
return context