runnable-hub/python/runnable_hub/context.py (36 lines of code) (raw):

from pydantic import BaseModel from typing import List, Dict, Optional, Generic, TypeVar from datetime import datetime from enum import Enum T = TypeVar('T') R = TypeVar("R") class RunnableStatus(Enum): PENDING = "PENDING" RUNNING = "RUNNING" SUCCESS = "SUCCESS" ERROR = "ERROR" class RunnableRequest(BaseModel): runnableCode: str class RunnableResponse(BaseModel): runnableCode: str class RunnablePromise(BaseModel): resolve: Dict[str, Dict] = {} result: Dict[str, Dict] = {} reject: Dict[str, str] = {} class RunnableContext(BaseModel, Generic[T,R]): request: T response: Optional[R] = None promise: RunnablePromise = RunnablePromise() executeId: str createTime: datetime startTime: Optional[datetime] = None endTime: Optional[datetime] = None status: RunnableStatus errorMessage: Optional[str] = None callDepth: int = 0 data: Dict = {} runnableCode: str parentRunnableCode: Optional[str] = None parentExecuteId: Optional[str] = None name: Optional[str] = None storePath: str