runnable-hub/python/runnable_workers/chainWorker/next.py (20 lines of code) (raw):
import os
import json
with open(os.environ.get('PYTHON_RUN_PATH')+"/completion", "r") as h:
completion = h.read()
function = None
if os.path.exists(os.environ.get('PYTHON_RUN_PATH')+"/function"):
with open(os.environ.get('PYTHON_RUN_PATH')+"/function", "r") as h:
function = h.read()
try:
context = onNext(ChainContext(message=completion, function=function))
if context.do.overwriteMessage is not None:
print("<overwriteMessage>" + context.do.overwriteMessage + "</overwriteMessage>")
if context.do.message is not None:
print("<message>" + context.do.message + "</message>")
elif context.do.function is not None:
print("<function>" + json.dumps(context.do.function.model_dump()) + "</function>")
elif context.do.finalAnswer is not None:
print("<finalAnswer>" + context.do.finalAnswer + "</finalAnswer>")
except Exception as e:
print("<error>"+str(e)+"</error>")