runnable-hub/python/runnable_workers/pythonWorker/worker.py (52 lines of code) (raw):

from runnable_hub import RunnableWorker, RunnableContext, RunnableStatus from .request import PythonRequest from .response import PythonResponse import sys import asyncio import shutil import os class Worker(RunnableWorker): runnableCode = "PYTHON" Request = PythonRequest Response = PythonResponse pythonBin = sys.executable def __init__(self, storePath = "/tmp/python"): self.storePath = storePath if not os.path.exists(self.storePath): os.makedirs(self.storePath) @staticmethod async def run_python(command, cwd=None, env=None): process = await asyncio.create_subprocess_exec( *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=cwd, env=env ) stdout, stderr = await process.communicate() return process.returncode, stdout.decode(), stderr.decode() async def onNext(self, context: RunnableContext[PythonRequest, PythonResponse]) -> RunnableContext: # create a temporary path to store script temporary_path = os.path.join(self.storePath, context.executeId) if not os.path.exists(temporary_path): os.makedirs(temporary_path) try: temp_file = f"{temporary_path}/run.py" with open(temp_file, "w") as h: h.write(context.request.run) for fileName, fileContent in context.request.data.items(): if context.request.outputs is not None and fileName in context.request.outputs: raise Exception(f"Data file name '{fileName}' cannot be the same as output file name") with open(f"{temporary_path}/{fileName}", "w") as h: h.write(fileContent) returncode, stdout, stderr = await self.run_python( [self.pythonBin, temp_file], cwd=temporary_path, env={"PYTHON_RUN_PATH": temporary_path}) outputs = {} if context.request.outputs is not None: for key, fileName in context.request.outputs.items(): with open(f"{temporary_path}/{fileName}", "r") as h: outputs[key] = h.read().strip() context.response = PythonResponse(returncode=returncode, stdout=stdout, stderr=stderr, outputs=outputs) context.status = RunnableStatus.SUCCESS finally: shutil.rmtree(temporary_path) return context