in azure/durable_functions/models/DurableOrchestrationContext.py [0:0]
def __init__(self,
history: List[Dict[Any, Any]], instanceId: str, isReplaying: bool,
parentInstanceId: str, input: Any = None, upperSchemaVersion: int = 0,
maximumShortTimerDuration: str = None,
longRunningTimerIntervalDuration: str = None, upperSchemaVersionNew: int = None,
**kwargs):
self._histories: List[HistoryEvent] = [HistoryEvent(**he) for he in history]
self._instance_id: str = instanceId
self._is_replaying: bool = isReplaying
self._parent_instance_id: str = parentInstanceId
self._maximum_short_timer_duration: datetime.timedelta = None
if maximumShortTimerDuration is not None:
max_short_duration = parse_timespan_attrib(maximumShortTimerDuration)
self._maximum_short_timer_duration = max_short_duration
self._long_timer_interval_duration: datetime.timedelta = None
if longRunningTimerIntervalDuration is not None:
long_interval_duration = parse_timespan_attrib(longRunningTimerIntervalDuration)
self._long_timer_interval_duration = long_interval_duration
self._custom_status: Any = None
self._new_uuid_counter: int = 0
self._sub_orchestrator_counter: int = 0
self._continue_as_new_flag: bool = False
self.decision_started_event: HistoryEvent = \
[e_ for e_ in self.histories
if e_.event_type == HistoryEventType.ORCHESTRATOR_STARTED][0]
self._current_utc_datetime: datetime.datetime = \
self.decision_started_event.timestamp
self._new_uuid_counter = 0
self._function_context: FunctionContext = FunctionContext(**kwargs)
self._sequence_number = 0
self._replay_schema = ReplaySchema(upperSchemaVersion)
if (upperSchemaVersionNew is not None
and upperSchemaVersionNew > self._replay_schema.value):
valid_schema_values = [enum_member.value for enum_member in ReplaySchema]
if upperSchemaVersionNew in valid_schema_values:
self._replay_schema = ReplaySchema(upperSchemaVersionNew)
else:
self._replay_schema = ReplaySchema(max(valid_schema_values))
self._action_payload_v1: List[List[Action]] = []
self._action_payload_v2: List[Action] = []
# make _input always a string
# (consistent with Python Functions generic trigger/input bindings)
if (isinstance(input, Dict)):
input = json.dumps(input)
self._input: Any = input
self.open_tasks: DefaultDict[Union[int, str], Union[List[TaskBase], TaskBase]]
self.open_tasks = defaultdict(list)
self.deferred_tasks: Dict[Union[int, str], Tuple[HistoryEvent, bool, str]] = {}