def set_task_value()

in azure/durable_functions/models/TaskOrchestrationExecutor.py [0:0]


    def set_task_value(self, event: HistoryEvent, is_success: bool, id_key: str):
        """Set a running task to either a success or failed state, and sets its value.

        Parameters
        ----------
        event : HistoryEvent
            The history event containing the value for the Task
        is_success : bool
            Whether the Task succeeded or failed (throws exception)
        id_key : str
            The attribute in the event object containing the ID of the Task to target
        """

        def parse_history_event(directive_result):
            """Based on the type of event, parse the JSON.serializable portion of the event."""
            event_type = directive_result.event_type
            if event_type is None:
                raise ValueError("EventType is not found in task object")

            # We provide the ability to deserialize custom objects, because the output of this
            # will be passed directly to the orchestrator as the output of some activity
            if (event_type == HistoryEventType.SUB_ORCHESTRATION_INSTANCE_COMPLETED
                    and directive_result.Result is not None):
                return json.loads(directive_result.Result, object_hook=_deserialize_custom_object)
            if (event_type == HistoryEventType.TASK_COMPLETED
                    and directive_result.Result is not None):
                return json.loads(directive_result.Result, object_hook=_deserialize_custom_object)
            if (event_type == HistoryEventType.EVENT_RAISED
                    and directive_result.Input is not None):
                # TODO: Investigate why the payload is in "Input" instead of "Result"
                response = json.loads(directive_result.Input,
                                      object_hook=_deserialize_custom_object)
                return response
            return None

        # get target task
        key = getattr(event, id_key)
        try:
            task: Union[TaskBase, List[TaskBase]] = self.context.open_tasks.pop(key)
            if isinstance(task, list):
                task_list = task
                task = task_list.pop()
                if len(task_list) > 0:
                    self.context.open_tasks[key] = task_list
        except KeyError:
            warning = f"Potential duplicate Task completion for TaskId: {key}"
            warnings.warn(warning)
            self.context.deferred_tasks[key] = lambda: self.set_task_value(
                event, is_success, id_key)
            return

        if is_success:
            # retrieve result
            new_value = parse_history_event(event)
            if task._api_name == "CallEntityAction":
                event_payload = ResponseMessage.from_dict(new_value)
                new_value = json.loads(event_payload.result)

                if event_payload.is_exception:
                    new_value = Exception(new_value)
                    is_success = False
        else:
            # generate exception
            new_value = Exception(f"{event.Reason} \n {event.Details}")

        # with a yielded task now evaluated, we can try to resume the user code
        task.set_is_played(event._is_played)
        task.set_value(is_error=not is_success, value=new_value)