def make_worker_request()

in runnable-hub/python/runnable_workers/processWorker/worker.py [0:0]


    def make_worker_request(self, step, stepOutputs, inputs, settings, jobsOutputs) -> Dict:

        renderData = {"steps": stepOutputs, "inputs": inputs, "settings": settings, "jobs": jobsOutputs}
        outputs = None
        if step.get("shell") is not None: 
            outputs = OutputRender('$SHELL_RUN_PATH/{outputFileName}')
        elif step.get("python") is not None:
            outputs = OutputRender("os.environ.get('PYTHON_RUN_PATH')+'/{outputFileName}'")
        if outputs is not None:
            renderData["outputs"] = outputs
        
        stepRender:Any = self.complexRender(renderData, step)

        if step.get("runnableCode") is not None:
            runnableCode = step["runnableCode"]
        elif step.get("shell") is not None:
            runnableCode = "SHELL"
        elif step.get("api") is not None:
            runnableCode = "API"
        elif step.get("jinja") is not None:
            runnableCode = "JINJA"
        elif step.get("python") is not None:
            runnableCode = "PYTHON"
        elif step.get("tool") is not None:
            runnableCode = "TOOL"
        elif step.get("agent") is not None:
            runnableCode = "AGENT"
        else:
            raise RuntimeError(f"step {step} has no runnableCode")
        
        if runnableCode in ["SHELL", "PYTHON"]:
            stepRender["request"] = {
                "runnableCode": runnableCode,
                "run": stepRender[runnableCode.lower()],
            }
        else:
            stepRender["request"] = stepRender[runnableCode.lower()]

        if outputs is not None:
            stepRender["request"]["outputs"] = outputs.outputValueMap

        stepRender["request"]["runnableCode"] = runnableCode
        return stepRender