public async Task CompleteTaskOrchestrationWorkItemAsync()

in src/DurableTask.AzureServiceFabric/FabricOrchestrationService.cs [204:366]


        public async Task CompleteTaskOrchestrationWorkItemAsync(
            TaskOrchestrationWorkItem workItem,
            OrchestrationRuntimeState newOrchestrationRuntimeState,
            IList<TaskMessage> outboundMessages,
            IList<TaskMessage> orchestratorMessages,
            IList<TaskMessage> timerMessages,
            TaskMessage continuedAsNewMessage,
            OrchestrationState orchestrationState)
        {
            SessionInformation sessionInfo = GetSessionInfo(workItem.InstanceId);
            bool isComplete = false;

            try
            {
                var orchestrationStatus = workItem.OrchestrationRuntimeState.OrchestrationStatus;
                ServiceFabricProviderEventSource.Tracing.LogOrchestrationInformation(workItem.InstanceId,
                workItem.OrchestrationRuntimeState.OrchestrationInstance?.ExecutionId,
                $"Current orchestration status: {orchestrationStatus}");
                isComplete = this.IsOrchestrationComplete(orchestrationStatus);
            }
            catch (InvalidOperationException ex)
            {
                // OrchestrationRuntimeState.OrchestrationStatus throws 'InvalidOperationException' if 'ExecutionStartedEvent' is missing.
                // Do not process the orchestration workitem if 'ExecutionStartedEvent' is missing.
                // This can happen when an orchestration message like ExecutionTerminatedEvent is sent to an already finished orchestration
                if (workItem.OrchestrationRuntimeState.ExecutionStartedEvent == null)
                {
                    ServiceFabricProviderEventSource.Tracing.UnexpectedCodeCondition($"InstanceId: '{workItem.InstanceId}', exception: {ex}. Dropping the bad orchestration to avoid noise.");
                    await this.DropOrchestrationAsync(workItem);
                }
            }

            IList<OrchestrationInstance> sessionsToEnqueue = null;
            List<Message<Guid, TaskMessageItem>> scheduledMessages = null;
            List<Message<string, TaskMessageItem>> activityMessages = null;

            await RetryHelper.ExecuteWithRetryOnTransient(async () =>
            {
                bool retryOnException;
                do
                {
                    try
                    {
                        retryOnException = false;
                        sessionsToEnqueue = null;
                        scheduledMessages = null;
                        activityMessages = null;

                        using (var txn = this.stateManager.CreateTransaction())
                        {
                            if (outboundMessages?.Count > 0)
                            {
                                activityMessages = outboundMessages.Select(m => new Message<string, TaskMessageItem>(Guid.NewGuid().ToString(), new TaskMessageItem(m))).ToList();
                                await this.activitiesProvider.SendBatchBeginAsync(txn, activityMessages);
                            }

                            if (timerMessages?.Count > 0)
                            {
                                scheduledMessages = timerMessages.Select(m => new Message<Guid, TaskMessageItem>(Guid.NewGuid(), new TaskMessageItem(m))).ToList();
                                await this.scheduledMessagesProvider.SendBatchBeginAsync(txn, scheduledMessages);
                            }

                            if (orchestratorMessages?.Count > 0)
                            {
                                // Commenting this code to allow nested (multi-level) suborchestrations
                                // If the suborchestration has orchestration messages then we need process them under suborchestration's
                                // session provider except for another suborchestration called from current suborchestraiton.

                                //if (workItem.OrchestrationRuntimeState?.ParentInstance != null)
                                //{
                                //    sessionsToEnqueue = await this.orchestrationProvider.TryAppendMessageBatchAsync(txn, orchestratorMessages.Select(tm => new TaskMessageItem(tm)));
                                //}
                                //else
                                {
                                    await this.orchestrationProvider.AppendMessageBatchAsync(txn, orchestratorMessages.Select(tm => new TaskMessageItem(tm)));
                                    sessionsToEnqueue = orchestratorMessages.Select(m => m.OrchestrationInstance).ToList();
                                }
                            }

                            if (continuedAsNewMessage != null)
                            {
                                await this.orchestrationProvider.AppendMessageAsync(txn, new TaskMessageItem(continuedAsNewMessage));
                                sessionsToEnqueue = new List<OrchestrationInstance>() { continuedAsNewMessage.OrchestrationInstance };
                            }

                            await this.orchestrationProvider.CompleteMessages(txn, sessionInfo.Instance, sessionInfo.LockTokens);

                            if (workItem.OrchestrationRuntimeState.OrchestrationStatus == OrchestrationStatus.ContinuedAsNew)
                            {
                                await HandleCompletedOrchestrationAsync(workItem);
                            }

                            // When an orchestration is completed, we need to drop the session which involves 2 steps (1) Removing the row from sessions
                            // (2) Dropping the session messages dictionary. The second step is done in background thread for performance so is not
                            // part of transaction. Since it will happen outside the trasanction, if this transaction fails for some reason and we dropped
                            // the session as part of this transaction, we wouldn't have updated the session state but would have lost the messages
                            // in the session messages dictionary which are needed for state to reach complete state (when the orchestration is picked up again in next fetch).
                            // So we don't want to drop session as part of this transaction.
                            // Instead, we drop the session as part of a subsequent different transaction.
                            // However, framework passes us 'null' value for 'newOrchestrationRuntimeState' when orchestration is completed and
                            // if we updated the session state to null and this transaction succeded, and a node failures occurs and we
                            // never call the subsequent transaction, we will lose the runtime state of orchestration and never will be able to
                            // mark it as complete even if it is. So we use the work item's runtime state when 'newOrchestrationRuntimeState' is null
                            // so that the latest state is what is stored for the session.
                            // As part of next transaction, we are going to remove the row anyway for the session and it doesn't matter to update it to 'null'.

                            await this.orchestrationProvider.UpdateSessionState(txn, newOrchestrationRuntimeState.OrchestrationInstance, newOrchestrationRuntimeState ?? workItem.OrchestrationRuntimeState);

                            // We skip writing to instanceStore when orchestration reached terminal state to avoid a minor timing issue that
                            // wait for an orchestration completes but another orchestration with the same name cannot be started immediately
                            // because the session is still in store. We update the instance store on orchestration completion and drop the
                            // session as part of the next atomic transaction.
                            if (this.instanceStore != null && orchestrationState != null && !isComplete)
                            {
                                await this.instanceStore.WriteEntitiesAsync(txn, new InstanceEntityBase[]
                                {
                                    new OrchestrationStateInstanceEntity()
                                    {
                                        State = orchestrationState
                                    }
                                });
                            }
                            await txn.CommitAsync();
                        }
                    }
                    catch (FabricReplicationOperationTooLargeException ex)
                    {
                        ServiceFabricProviderEventSource.Tracing.ExceptionInReliableCollectionOperations($"OrchestrationInstance = {sessionInfo.Instance}, Action = {nameof(CompleteTaskOrchestrationWorkItemAsync)}", ex.ToString());
                        retryOnException = true;
                        newOrchestrationRuntimeState = null;
                        outboundMessages = null;
                        timerMessages = null;
                        orchestratorMessages = null;
                        if (orchestrationState != null)
                        {
                            orchestrationState.OrchestrationStatus = OrchestrationStatus.Failed;
                            orchestrationState.Output = $"Fabric exception when trying to process orchestration: {ex}. Investigate and consider reducing the serialization size of orchestration inputs/outputs/overall length to avoid the issue.";
                        }
                    }
                } while (retryOnException);
            }, uniqueActionIdentifier: $"OrchestrationId = '{workItem.InstanceId}', Action = '{nameof(CompleteTaskOrchestrationWorkItemAsync)}'");

            if (activityMessages != null)
            {
                this.activitiesProvider.SendBatchComplete(activityMessages);
            }
            if (scheduledMessages != null)
            {
                this.scheduledMessagesProvider.SendBatchComplete(scheduledMessages);
            }
            if (sessionsToEnqueue != null)
            {
                foreach (var instance in sessionsToEnqueue)
                {
                    this.orchestrationProvider.TryEnqueueSession(instance);
                }
            }

            if (isComplete)
            {
                await HandleCompletedOrchestrationAsync(workItem);
            }
        }