runnable-hub/python/runnable_workers/jinjaWorker/worker.py (24 lines of code) (raw):
from runnable_hub import RunnableWorker, RunnableContext, RunnableStatus, RunnableOutputLoads
from .request.jinjaRequest import JinjaRequest
from .response import JinjaResponse
from jinja2 import Environment
import json
class Worker(RunnableWorker):
runnableCode = "JINJA"
Request = JinjaRequest
Response = JinjaResponse
def __init__(self):
self.jinjaEnv = Environment()
async def onNext(self, context: RunnableContext[JinjaRequest, JinjaResponse]) -> RunnableContext:
renderData = context.request.data
rawResult = self.jinjaEnv.from_string(context.request.template).render(**renderData)
if context.request.outputLoads == RunnableOutputLoads.TEXT:
context.response = JinjaResponse(result=rawResult, outputs=rawResult)
context.status = RunnableStatus.SUCCESS
elif context.request.outputLoads == RunnableOutputLoads.JSON:
context.response = JinjaResponse(result=rawResult, outputs=json.loads(rawResult))
context.status = RunnableStatus.SUCCESS
else:
context.status = RunnableStatus.ERROR
context.errorMessage = "resultFormat not support"
return context