azure/durable_functions/models/OrchestratorState.py (61 lines of code) (raw):

import json from typing import List, Any, Dict, Optional from azure.durable_functions.models.ReplaySchema import ReplaySchema from .utils.json_utils import add_attrib from azure.durable_functions.models.actions.Action import Action from azure.functions._durable_functions import _serialize_custom_object class OrchestratorState: """Orchestration State. Used to communicate the state of the orchestration back to the durable extension """ def __init__(self, is_done: bool, actions: List[List[Action]], output: Any, replay_schema: ReplaySchema, error: str = None, custom_status: Any = None): self._is_done: bool = is_done self._actions: List[List[Action]] = actions self._output: Any = output self._error: Optional[str] = error self._custom_status: Any = custom_status self._replay_schema: ReplaySchema = replay_schema @property def actions(self) -> List[List[Action]]: """Get the ordered list of async actions the orchestrator function should perform. This list is append-only; it must contain all scheduled async actions up to the latest requested work, even actions that have already been completed. Actions are grouped by execution. Each subsequent orchestrator execution should add a new array of action objects to the collection. """ return self._actions @property def is_done(self) -> bool: """Get indicator of whether this is the last execution of this orchestrator instance. When this value is true, the Durable Functions extension will consider the orchestration instance completed and will attempt to return the output value. """ return self._is_done @property def output(self): """Get the JSON-serializable value returned by the orchestrator instance completion. Optional. """ return self._output @property def error(self) -> Optional[str]: """Get the error received when running the orchestration. Optional. """ return self._error @property def custom_status(self): """Get the JSON-serializable value used by DurableOrchestrationContext.SetCustomStatus.""" return self._custom_status @property def schema_version(self): """Get the Replay Schema represented in this OrchestratorState payload.""" return self._replay_schema.value def to_json(self) -> Dict[str, Any]: """Convert object into a json dictionary. Returns ------- Dict[str, Any] The instance of the class converted into a json dictionary """ json_dict: Dict[str, Any] = {} add_attrib(json_dict, self, '_is_done', 'isDone') if self._replay_schema != ReplaySchema.V1: add_attrib(json_dict, self, 'schema_version', 'schemaVersion') self._add_actions(json_dict) if not (self._output is None): json_dict['output'] = self._output if self._error: json_dict['error'] = self._error if self._custom_status: json_dict['customStatus'] = self._custom_status return json_dict def _add_actions(self, json_dict): json_dict['actions'] = [] for action_list in self._actions: action_result_list = [] for action_obj in action_list: action_result_list.append(action_obj.to_json()) json_dict['actions'].append(action_result_list) def to_json_string(self) -> str: """Convert object into a json string. Returns ------- str The instance of the object in json string format """ json_dict = self.to_json() return json.dumps(json_dict, default=_serialize_custom_object)