in src/orchestrations/TaskOrchestrationExecutor.ts [236:348]
private setTaskValue(event: HistoryEvent, isSuccess: boolean, idKey: string): void {
/**
* @hidden
*
* Extracts a task's result from its corresponding History event
* @param completionEvent
* The History event corresponding to the task's completion
* @returns
* The task's result
*/
function extractResult(completionEvent: HistoryEvent): unknown {
let taskResult: unknown;
switch (completionEvent.EventType) {
case HistoryEventType.SubOrchestrationInstanceCompleted:
taskResult = JSON.parse(
(completionEvent as SubOrchestrationInstanceCompletedEvent).Result
);
break;
case HistoryEventType.TaskCompleted:
taskResult = JSON.parse((completionEvent as TaskCompletedEvent).Result);
break;
case HistoryEventType.EventRaised:
const eventRaised = completionEvent as EventRaisedEvent;
taskResult =
eventRaised && eventRaised.Input
? JSON.parse(eventRaised.Input)
: undefined;
break;
default:
break;
}
return taskResult;
}
// First, we attempt to recover the task associated with this history event
let task: TaskBase | undefined;
const key = event[idKey as keyof typeof event];
if (typeof key === "number" || typeof key === "string") {
task = this.openTasks[key];
const taskList: TaskBase[] | undefined = this.openEvents[key];
if (task !== undefined) {
// Remove task from open tasks
delete this.openTasks[key];
} else if (taskList !== undefined) {
task = taskList.pop() as TaskBase;
// We ensure openEvents only has an entry for this key if
// there's at least 1 task to consume
if (taskList.length == 0) {
delete this.openEvents[key];
}
} else {
// If the task is in neither open tasks nor open events, then it must
// correspond to the response of an external event that we have yet to wait for.
// We track this by deferring the assignment of this task's result until after the task
// is scheduled.
const updateTask = function (): void {
this.setTaskValue(event, isSuccess, idKey);
return; // we return because the task is yet to be scheduled
};
this.deferredTasks[key] = updateTask.bind(this);
return;
}
} else {
throw Error(
`Task with ID ${key} could not be retrieved from due to its ID-key being of type ${typeof key}. ` +
`We expect ID-keys to be of type number or string. ` +
`This is probably a replay failure, please file a bug report.`
);
}
// After obtaining the task, we obtain its result.
let taskResult: unknown;
if (isSuccess) {
// We obtain the task's result value from its corresponding History event.
taskResult = extractResult(event);
// CallEntity tasks need to further de-serialize its value from the
// History event, we handle that next.
const action = task.actionObj;
if (action instanceof CallEntityAction) {
const eventPayload = new ResponseMessage(taskResult);
taskResult = eventPayload.result ? JSON.parse(eventPayload.result) : undefined;
// Due to how ResponseMessage events are serialized, we can only
// determine if they correspond to a failure at this point in
// processing. As a result, we flip the "isSuccess" flag here
// if an exception is detected.
if (eventPayload.exceptionType !== undefined) {
taskResult = Error(taskResult as string);
isSuccess = false;
}
}
} else {
// The task failed, we attempt to extract the Reason and Details from the event.
if (
Utils.hasStringProperty(event, "Reason") &&
Utils.hasStringProperty(event, "Details")
) {
taskResult = new Error(`${event.Reason} \n ${event.Details}`);
} else {
throw Error(
`Task with ID ${task.id} failed but we could not parse its exception data.` +
`This is probably a replay failure, please file a bug report.`
);
}
}
// Set result to the task, and update it's isPlayed flag.
task.isPlayed = event.IsPlayed;
task.setValue(!isSuccess, taskResult, this);
}