in azure/durable_functions/models/TaskOrchestrationExecutor.py [0:0]
def process_event(self, event: HistoryEvent):
"""Evaluate a history event.
This might result in updating some orchestration internal state deterministically,
to evaluating some Task, or have no side-effects.
Parameters
----------
event : HistoryEvent
The history event to process
"""
event_type = event.event_type
if event_type == HistoryEventType.ORCHESTRATOR_STARTED:
# update orchestration's deterministic timestamp
timestamp = event.timestamp
if timestamp > self.context.current_utc_datetime:
self.context.current_utc_datetime = event.timestamp
elif event.event_type == HistoryEventType.CONTINUE_AS_NEW:
# re-initialize the orchestration state
self.initialize()
elif event_type == HistoryEventType.EXECUTION_STARTED:
# begin replaying user code
self.resume_user_code()
elif event_type == HistoryEventType.EVENT_SENT:
# we want to differentiate between a "proper" event sent, and a signal/call entity
key = getattr(event, "event_id")
if key in self.context.open_tasks.keys():
task = self.context.open_tasks[key]
if task._api_name == "CallEntityAction":
# in the signal entity case, the Task is represented
# with a GUID, not with a sequential integer
self.context.open_tasks.pop(key)
event_id = json.loads(event.Input)["id"]
self.context.open_tasks[event_id] = task
elif self.is_task_completion_event(event.event_type):
# transition a task to a success or failure state
(is_success, id_key) = self.event_to_SetTaskValuePayload[event_type]
self.set_task_value(event, is_success, id_key)
self.resume_user_code()