public async Task CompleteTaskOrchestrationWorkItemAsync()

in src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs [1104:1241]


        public async Task CompleteTaskOrchestrationWorkItemAsync(
            TaskOrchestrationWorkItem workItem,
            OrchestrationRuntimeState newOrchestrationRuntimeState,
            IList<TaskMessage> outboundMessages,
            IList<TaskMessage> orchestratorMessages,
            IList<TaskMessage> timerMessages,
            TaskMessage continuedAsNewMessage,
            OrchestrationState orchestrationState)
        {
            // for backwards compatibility, we transform timer timestamps to UTC prior to persisting in Azure Storage.
            // see: https://github.com/Azure/durabletask/pull/1138
            foreach (var orchestratorMessage in orchestratorMessages)
            {
                Utils.ConvertDateTimeInHistoryEventsToUTC(orchestratorMessage.Event);
            }
            foreach (var timerMessage in timerMessages)
            {
                Utils.ConvertDateTimeInHistoryEventsToUTC(timerMessage.Event);
            }   

            OrchestrationSession session;
            if (!this.orchestrationSessionManager.TryGetExistingSession(workItem.InstanceId, out session))
            {
                this.settings.Logger.AssertFailure(
                    this.azureStorageClient.QueueAccountName,
                    this.settings.TaskHubName,
                    $"{nameof(CompleteTaskOrchestrationWorkItemAsync)}: Session for instance {workItem.InstanceId} was not found!");
                return;
            }

            session.StartNewLogicalTraceScope();
            OrchestrationRuntimeState runtimeState = newOrchestrationRuntimeState ?? workItem.OrchestrationRuntimeState;

            // Only commit side-effects if the orchestration runtime state is valid (i.e. not corrupted)
            if (!runtimeState.IsValid)
            {
                this.settings.Logger.GeneralWarning(
                    this.azureStorageClient.QueueAccountName,
                    this.settings.TaskHubName,
                    $"{nameof(CompleteTaskOrchestrationWorkItemAsync)}: Discarding execution results because the orchestration state is invalid.",
                    instanceId: workItem.InstanceId);
                await this.DeleteMessageBatchAsync(session, session.CurrentMessageBatch);
                return;
            }

            string instanceId = workItem.InstanceId;
            string executionId = runtimeState.OrchestrationInstance?.ExecutionId;
            if (executionId == null)
            {
                this.settings.Logger.GeneralWarning(
                    this.azureStorageClient.QueueAccountName,
                    this.settings.TaskHubName,
                    $"{nameof(CompleteTaskOrchestrationWorkItemAsync)}: Could not find execution id.",
                    instanceId: instanceId);
            }

            // Correlation
            CorrelationTraceClient.Propagate(() =>
                {
                    // In case of Extended Session, Emit the Dependency Telemetry. 
                    if (workItem.IsExtendedSession)
                    {
                        this.TrackExtendedSessionDependencyTelemetry(session);
                    }
                });

            TraceContextBase currentTraceContextBaseOnComplete = null;
            CorrelationTraceClient.Propagate(() =>
                currentTraceContextBaseOnComplete = CreateOrRestoreRequestTraceContextWithDependencyTrackingSettings(
                    workItem.TraceContext,
                    orchestrationState,
                    DependencyTelemetryStarted(
                        outboundMessages,
                        orchestratorMessages,
                        timerMessages,
                        continuedAsNewMessage,
                        orchestrationState)));

            // First, add new messages into the queue. If a failure happens after this, duplicate messages will
            // be written after the retry, but the results of those messages are expected to be de-dup'd later.
            // This provider needs to ensure that response messages are not processed until the history a few
            // lines down has been successfully committed.

            await this.CommitOutboundQueueMessages(
                session,
                outboundMessages,
                orchestratorMessages,
                timerMessages,
                continuedAsNewMessage);

            // correlation
            CorrelationTraceClient.Propagate(() =>
                this.TrackOrchestrationRequestTelemetry(
                    currentTraceContextBaseOnComplete,
                    orchestrationState,
                    $"{TraceConstants.Orchestrator} {Utils.GetTargetClassName(session.RuntimeState.ExecutionStartedEvent?.Name)}"));

            // Next, commit the orchestration history updates. This is the actual "checkpoint". Failures after this
            // will result in a duplicate replay of the orchestration with no side-effects.
            try
            {
                session.ETag = await this.trackingStore.UpdateStateAsync(runtimeState, workItem.OrchestrationRuntimeState, instanceId, executionId, session.ETag, session.TrackingStoreContext);
                // update the runtime state and execution id stored in the session
                session.UpdateRuntimeState(runtimeState);

                // if we deferred some messages, and the execution id of this instance has changed, redeliver them
                if (session.DeferredMessages.Count > 0
                    && executionId != workItem.OrchestrationRuntimeState.OrchestrationInstance?.ExecutionId)
                {
                    var messages = session.DeferredMessages.ToList();
                    session.DeferredMessages.Clear();
                    this.orchestrationSessionManager.AddMessageToPendingOrchestration(session.ControlQueue, messages, session.TraceActivityId, CancellationToken.None);
                }
            }
            catch (RequestFailedException rfe) when (rfe.Status == (int)HttpStatusCode.PreconditionFailed)
            {
                // Precondition failure is expected to be handled internally and logged as a warning.
                // The orchestration dispatcher will handle this exception by abandoning the work item
                throw new SessionAbortedException("Aborting execution due to failed precondition.", rfe);
            }
            catch (Exception e)
            {
                // TODO: https://github.com/Azure/azure-functions-durable-extension/issues/332
                //       It's possible that history updates may have been partially committed at this point.
                //       If so, what are the implications of this as far as DurableTask.Core are concerned?
                this.settings.Logger.OrchestrationProcessingFailure(
                    this.azureStorageClient.TableAccountName,
                    this.settings.TaskHubName,
                    instanceId,
                    executionId,
                    e.ToString());

                throw;
            }

            // Finally, delete the messages which triggered this orchestration execution. This is the final commit.
            await this.DeleteMessageBatchAsync(session, session.CurrentMessageBatch);
        }