runnable-hub/python/runnable_hub/hub.py (133 lines of code) (raw):
from datetime import datetime
import asyncio
import uuid
from abc import ABC, abstractmethod
from .context import RunnableRequest, RunnableContext, RunnableStatus
from .interface import RunnableFileStore, RunnableQueueBus, RunnableQueue
from .queue import RunnableLocalQueueBus
import os
import traceback
class RunnableWorker(ABC):
runnableCode = None
Request = None
Response = None
@abstractmethod
async def onNext(self, context: RunnableContext) -> RunnableContext:
pass
class RunnableHub():
def __init__(self, store: RunnableFileStore, queueBus: RunnableQueueBus = RunnableLocalQueueBus()):
self.workers = {}
self.requests = {}
self.responses = {}
self.store = store
self.queueBus = queueBus
@staticmethod
def shortExecuteId(executeId):
return executeId.split("-")[0]
def registerWorker(self, worker: RunnableWorker):
self.workers[worker.runnableCode] = RunnableWorkerDispatch(
worker=worker,
queue=self.queueBus.register(worker.runnableCode),
store=self.store,
hub=self
)
self.requests[worker.runnableCode] = worker.Request
self.responses[worker.runnableCode] = worker.Response
def readExecuteContext(self, storePath:str, runnableCode: str) -> RunnableContext:
return RunnableContext[self.requests[runnableCode], self.responses[runnableCode]].model_validate_json(
self.store.readFile(f"{storePath}/context.json"))
async def executeStart(self, request: RunnableRequest, parentContext: RunnableContext|None = None, name: str|None = None) -> RunnableContext:
executeId=str(uuid.uuid4())
if parentContext is None:
storePath = f"execute/{executeId}"
parentExecuteId = None
parentRunnableCode = None
callDepth = 0
else:
storePath = f"{parentContext.storePath}/{self.shortExecuteId(executeId)}"
parentExecuteId = parentContext.executeId
parentRunnableCode = parentContext.runnableCode
callDepth = parentContext.callDepth + 1
newContext = RunnableContext(
executeId=executeId,
runnableCode=request.runnableCode,
request=request,
response=None,
name=name,
callDepth=callDepth,
storePath=storePath,
parentExecuteId=parentExecuteId,
parentRunnableCode=parentRunnableCode,
createTime=datetime.now(),
status=RunnableStatus.PENDING)
self.store.saveFile(f"{newContext.storePath}/context.json", newContext.model_dump_json())
await self.workers[newContext.runnableCode].queue.send(f"{newContext.storePath}/context.json|")
return newContext
async def parentExecuteNext(self, context: RunnableContext):
if context.parentExecuteId is None or context.parentRunnableCode is None or context.name is None:
return
parentStorePath = os.path.dirname(context.storePath)
await self.workers[context.parentRunnableCode].queue.send(f"{parentStorePath}/context.json|{context.runnableCode}#{context.name}={self.shortExecuteId(context.executeId)}")
async def executeWait(self, context: RunnableContext) -> RunnableContext:
while context.status not in [RunnableStatus.ERROR, RunnableStatus.SUCCESS]:
await asyncio.sleep(1)
context = self.readExecuteContext(context.storePath, context.runnableCode)
return context
class RunnableWorkerDispatch():
worker: RunnableWorker
queue: RunnableQueue
store: RunnableFileStore
hub: RunnableHub
def __init__(self, worker: RunnableWorker, queue: RunnableQueue, store: RunnableFileStore, hub: RunnableHub):
self.worker = worker
self.queue = queue
self.store = store
self.hub = hub
asyncio.create_task(self.run())
async def run(self):
while True:
print(f"RunnableWorkerDispatch {self.worker.runnableCode} wait")
message = await self.queue.receive()
contextFile, callbacks = message.split("|", 1)
if callbacks != "":
print(f"RunnableWorkerDispatch {self.worker.runnableCode} get message {contextFile} with callback {callbacks}")
else:
print(f"RunnableWorkerDispatch {self.worker.runnableCode} get message {contextFile}")
context = RunnableContext[self.worker.Request, self.worker.Response].model_validate_json(
self.store.readFile(contextFile))
if callbacks != "":
for callback in callbacks.split(","):
# runnableCode#name=executeShortId
runnableCodeAndName, executeShortId = callback.split('=', 1)
callbackRunnableCode, name = runnableCodeAndName.split('#', 1)
callbackContext = self.hub.readExecuteContext(f"{context.storePath}/{executeShortId}", callbackRunnableCode)
if callbackContext.status == RunnableStatus.SUCCESS:
context.promise.result[name] = callbackContext.response.model_dump() if callbackContext.response is not None else {}
elif callbackContext.status == RunnableStatus.ERROR:
context.promise.reject[name] = callbackContext.errorMessage if callbackContext.errorMessage is not None else ""
else:
print(f"context {context.storePath}/{executeShortId} not in finish status")
if context.status == RunnableStatus.PENDING:
context.startTime = datetime.now()
context.status = RunnableStatus.RUNNING
elif context.status in [RunnableStatus.ERROR, RunnableStatus.SUCCESS]:
print(f"context {context.executeId} has been finished, status={context.status}")
continue
try:
context = await self.worker.onNext(context)
except Exception as e:
context.status = RunnableStatus.ERROR
context.errorMessage = f"Error: {str(e)}\nStack Trace:\n{traceback.format_exc()}"
# 执行器读取完成后将此结果置空
context.promise.result = {}
print(context)
if context.status in [RunnableStatus.ERROR, RunnableStatus.SUCCESS]:
context.endTime = datetime.now()
self.store.saveFile(contextFile, context.model_dump_json())
await self.hub.parentExecuteNext(context)
elif context.status == RunnableStatus.RUNNING:
todo = context.promise.resolve
context.promise.resolve = {}
self.store.saveFile(contextFile, context.model_dump_json())
while todo:
name, req = todo.popitem()
print(f"get todo {name} {req}")
await self.hub.executeStart(self.hub.requests[req['runnableCode']](**req), context, name)