in azure/durable_functions/models/TaskOrchestrationExecutor.py [0:0]
def execute(self, context: DurableOrchestrationContext,
history: List[HistoryEvent], fn) -> str:
"""Execute an orchestration via its history to evaluate Tasks and replay events.
Parameters
----------
context : DurableOrchestrationContext
The user's orchestration context, to interact with the user code.
history : List[HistoryEvent]
The orchestration history, to evaluate tasks and replay events.
fn : function
The user's orchestration function.
Returns
-------
str
A JSON-formatted string of the user's orchestration state, payload for the extension.
"""
self.context = context
evaluated_user_code = fn(context)
# The minimum History size is 2, in the shape: [OrchestratorStarted, ExecutionStarted].
# At the start of replay, the `is_replaying` flag is determined from the
# ExecutionStarted event.
# For some reason, OrchestratorStarted does not update its `isPlayed` field.
if len(history) < 2:
err_message = "Internal Durable Functions error: "\
+ f"received History array of size {len(history)} "\
+ "when a minimum size of 2 is expected. "\
+ "Please report this issue at "\
+ "https://github.com/Azure/azure-functions-durable-python/issues."
raise Exception(err_message)
# Set initial is_replaing state.
execution_started_event = history[1]
self.current_task.is_played = execution_started_event.is_played
# If user code is a generator, then it uses `yield` statements (the DF API)
# and so we iterate through the DF history, generating tasks and populating
# them with values when the history provides them
if isinstance(evaluated_user_code, GeneratorType):
self.generator = evaluated_user_code
for event in history:
self.process_event(event)
if self.has_execution_completed:
break
# Due to backwards compatibility reasons, it's possible
# for the `continue_as_new` API to be called without `yield` statements.
# Therefore, we explicitely check if `continue_as_new` was used before
# flatting the orchestration as returned/completed
elif not self.context.will_continue_as_new:
self.orchestrator_returned = True
self.output = evaluated_user_code
return self.get_orchestrator_state_str()