in runnable-hub/python/runnable_workers/processWorker/worker.py [0:0]
def make_worker_request(self, step, stepOutputs, inputs, settings, jobsOutputs) -> Dict:
renderData = {"steps": stepOutputs, "inputs": inputs, "settings": settings, "jobs": jobsOutputs}
outputs = None
if step.get("shell") is not None:
outputs = OutputRender('$SHELL_RUN_PATH/{outputFileName}')
elif step.get("python") is not None:
outputs = OutputRender("os.environ.get('PYTHON_RUN_PATH')+'/{outputFileName}'")
if outputs is not None:
renderData["outputs"] = outputs
stepRender:Any = self.complexRender(renderData, step)
if step.get("runnableCode") is not None:
runnableCode = step["runnableCode"]
elif step.get("shell") is not None:
runnableCode = "SHELL"
elif step.get("api") is not None:
runnableCode = "API"
elif step.get("jinja") is not None:
runnableCode = "JINJA"
elif step.get("python") is not None:
runnableCode = "PYTHON"
elif step.get("tool") is not None:
runnableCode = "TOOL"
elif step.get("agent") is not None:
runnableCode = "AGENT"
else:
raise RuntimeError(f"step {step} has no runnableCode")
if runnableCode in ["SHELL", "PYTHON"]:
stepRender["request"] = {
"runnableCode": runnableCode,
"run": stepRender[runnableCode.lower()],
}
else:
stepRender["request"] = stepRender[runnableCode.lower()]
if outputs is not None:
stepRender["request"]["outputs"] = outputs.outputValueMap
stepRender["request"]["runnableCode"] = runnableCode
return stepRender