in azure/durable_functions/models/TaskOrchestrationExecutor.py [0:0]
def set_task_value(self, event: HistoryEvent, is_success: bool, id_key: str):
"""Set a running task to either a success or failed state, and sets its value.
Parameters
----------
event : HistoryEvent
The history event containing the value for the Task
is_success : bool
Whether the Task succeeded or failed (throws exception)
id_key : str
The attribute in the event object containing the ID of the Task to target
"""
def parse_history_event(directive_result):
"""Based on the type of event, parse the JSON.serializable portion of the event."""
event_type = directive_result.event_type
if event_type is None:
raise ValueError("EventType is not found in task object")
# We provide the ability to deserialize custom objects, because the output of this
# will be passed directly to the orchestrator as the output of some activity
if (event_type == HistoryEventType.SUB_ORCHESTRATION_INSTANCE_COMPLETED
and directive_result.Result is not None):
return json.loads(directive_result.Result, object_hook=_deserialize_custom_object)
if (event_type == HistoryEventType.TASK_COMPLETED
and directive_result.Result is not None):
return json.loads(directive_result.Result, object_hook=_deserialize_custom_object)
if (event_type == HistoryEventType.EVENT_RAISED
and directive_result.Input is not None):
# TODO: Investigate why the payload is in "Input" instead of "Result"
response = json.loads(directive_result.Input,
object_hook=_deserialize_custom_object)
return response
return None
# get target task
key = getattr(event, id_key)
try:
task: Union[TaskBase, List[TaskBase]] = self.context.open_tasks.pop(key)
if isinstance(task, list):
task_list = task
task = task_list.pop()
if len(task_list) > 0:
self.context.open_tasks[key] = task_list
except KeyError:
warning = f"Potential duplicate Task completion for TaskId: {key}"
warnings.warn(warning)
self.context.deferred_tasks[key] = lambda: self.set_task_value(
event, is_success, id_key)
return
if is_success:
# retrieve result
new_value = parse_history_event(event)
if task._api_name == "CallEntityAction":
event_payload = ResponseMessage.from_dict(new_value)
new_value = json.loads(event_payload.result)
if event_payload.is_exception:
new_value = Exception(new_value)
is_success = False
else:
# generate exception
new_value = Exception(f"{event.Reason} \n {event.Details}")
# with a yielded task now evaluated, we can try to resume the user code
task.set_is_played(event._is_played)
task.set_value(is_error=not is_success, value=new_value)