in src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs [1104:1241]
public async Task CompleteTaskOrchestrationWorkItemAsync(
TaskOrchestrationWorkItem workItem,
OrchestrationRuntimeState newOrchestrationRuntimeState,
IList<TaskMessage> outboundMessages,
IList<TaskMessage> orchestratorMessages,
IList<TaskMessage> timerMessages,
TaskMessage continuedAsNewMessage,
OrchestrationState orchestrationState)
{
// for backwards compatibility, we transform timer timestamps to UTC prior to persisting in Azure Storage.
// see: https://github.com/Azure/durabletask/pull/1138
foreach (var orchestratorMessage in orchestratorMessages)
{
Utils.ConvertDateTimeInHistoryEventsToUTC(orchestratorMessage.Event);
}
foreach (var timerMessage in timerMessages)
{
Utils.ConvertDateTimeInHistoryEventsToUTC(timerMessage.Event);
}
OrchestrationSession session;
if (!this.orchestrationSessionManager.TryGetExistingSession(workItem.InstanceId, out session))
{
this.settings.Logger.AssertFailure(
this.azureStorageClient.QueueAccountName,
this.settings.TaskHubName,
$"{nameof(CompleteTaskOrchestrationWorkItemAsync)}: Session for instance {workItem.InstanceId} was not found!");
return;
}
session.StartNewLogicalTraceScope();
OrchestrationRuntimeState runtimeState = newOrchestrationRuntimeState ?? workItem.OrchestrationRuntimeState;
// Only commit side-effects if the orchestration runtime state is valid (i.e. not corrupted)
if (!runtimeState.IsValid)
{
this.settings.Logger.GeneralWarning(
this.azureStorageClient.QueueAccountName,
this.settings.TaskHubName,
$"{nameof(CompleteTaskOrchestrationWorkItemAsync)}: Discarding execution results because the orchestration state is invalid.",
instanceId: workItem.InstanceId);
await this.DeleteMessageBatchAsync(session, session.CurrentMessageBatch);
return;
}
string instanceId = workItem.InstanceId;
string executionId = runtimeState.OrchestrationInstance?.ExecutionId;
if (executionId == null)
{
this.settings.Logger.GeneralWarning(
this.azureStorageClient.QueueAccountName,
this.settings.TaskHubName,
$"{nameof(CompleteTaskOrchestrationWorkItemAsync)}: Could not find execution id.",
instanceId: instanceId);
}
// Correlation
CorrelationTraceClient.Propagate(() =>
{
// In case of Extended Session, Emit the Dependency Telemetry.
if (workItem.IsExtendedSession)
{
this.TrackExtendedSessionDependencyTelemetry(session);
}
});
TraceContextBase currentTraceContextBaseOnComplete = null;
CorrelationTraceClient.Propagate(() =>
currentTraceContextBaseOnComplete = CreateOrRestoreRequestTraceContextWithDependencyTrackingSettings(
workItem.TraceContext,
orchestrationState,
DependencyTelemetryStarted(
outboundMessages,
orchestratorMessages,
timerMessages,
continuedAsNewMessage,
orchestrationState)));
// First, add new messages into the queue. If a failure happens after this, duplicate messages will
// be written after the retry, but the results of those messages are expected to be de-dup'd later.
// This provider needs to ensure that response messages are not processed until the history a few
// lines down has been successfully committed.
await this.CommitOutboundQueueMessages(
session,
outboundMessages,
orchestratorMessages,
timerMessages,
continuedAsNewMessage);
// correlation
CorrelationTraceClient.Propagate(() =>
this.TrackOrchestrationRequestTelemetry(
currentTraceContextBaseOnComplete,
orchestrationState,
$"{TraceConstants.Orchestrator} {Utils.GetTargetClassName(session.RuntimeState.ExecutionStartedEvent?.Name)}"));
// Next, commit the orchestration history updates. This is the actual "checkpoint". Failures after this
// will result in a duplicate replay of the orchestration with no side-effects.
try
{
session.ETag = await this.trackingStore.UpdateStateAsync(runtimeState, workItem.OrchestrationRuntimeState, instanceId, executionId, session.ETag, session.TrackingStoreContext);
// update the runtime state and execution id stored in the session
session.UpdateRuntimeState(runtimeState);
// if we deferred some messages, and the execution id of this instance has changed, redeliver them
if (session.DeferredMessages.Count > 0
&& executionId != workItem.OrchestrationRuntimeState.OrchestrationInstance?.ExecutionId)
{
var messages = session.DeferredMessages.ToList();
session.DeferredMessages.Clear();
this.orchestrationSessionManager.AddMessageToPendingOrchestration(session.ControlQueue, messages, session.TraceActivityId, CancellationToken.None);
}
}
catch (RequestFailedException rfe) when (rfe.Status == (int)HttpStatusCode.PreconditionFailed)
{
// Precondition failure is expected to be handled internally and logged as a warning.
// The orchestration dispatcher will handle this exception by abandoning the work item
throw new SessionAbortedException("Aborting execution due to failed precondition.", rfe);
}
catch (Exception e)
{
// TODO: https://github.com/Azure/azure-functions-durable-extension/issues/332
// It's possible that history updates may have been partially committed at this point.
// If so, what are the implications of this as far as DurableTask.Core are concerned?
this.settings.Logger.OrchestrationProcessingFailure(
this.azureStorageClient.TableAccountName,
this.settings.TaskHubName,
instanceId,
executionId,
e.ToString());
throw;
}
// Finally, delete the messages which triggered this orchestration execution. This is the final commit.
await this.DeleteMessageBatchAsync(session, session.CurrentMessageBatch);
}