public async Task CompleteTaskOrchestrationWorkItemAsync()

in src/DurableTask.ServiceBus/ServiceBusOrchestrationService.cs [690:862]


        public async Task CompleteTaskOrchestrationWorkItemAsync(
            TaskOrchestrationWorkItem workItem,
            OrchestrationRuntimeState newOrchestrationRuntimeState,
            IList<TaskMessage> outboundMessages,
            IList<TaskMessage> orchestratorMessages,
            IList<TaskMessage> timerMessages,
            TaskMessage continuedAsNewMessage,
            OrchestrationState orchestrationState)
        {
            OrchestrationRuntimeState runtimeState = workItem.OrchestrationRuntimeState;
            ServiceBusOrchestrationSession sessionState = GetSessionInstanceForWorkItem(workItem);
            if (sessionState == null)
            {
                // ReSharper disable once NotResolvedInText
                throw new ArgumentNullException("SessionInstance");
            }

            var session = sessionState.Session;

            using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
            {
                Transaction.Current.TransactionCompleted += (o, e) =>
                    TraceHelper.TraceInstance(
                        e.Transaction.TransactionInformation.Status == TransactionStatus.Committed ? TraceEventType.Information : TraceEventType.Error,
                        "ServiceBusOrchestrationService-CompleteTaskOrchestrationWorkItem-TransactionComplete",
                        runtimeState.OrchestrationInstance,
                        () => $@"Orchestration Transaction Completed {
                                e.Transaction.TransactionInformation.LocalIdentifier
                            } status: {
                                e.Transaction.TransactionInformation.Status}");

                TraceHelper.TraceInstance(
                    TraceEventType.Information,
                    "ServiceBusOrchestrationService-CompleteTaskOrchestrationWorkItem-CreateTransaction",
                    runtimeState.OrchestrationInstance,
                    () => $@"Created new Orchestration Transaction - txnid: {
                            Transaction.Current.TransactionInformation.LocalIdentifier
                        }");

                if (await TrySetSessionStateAsync(workItem, newOrchestrationRuntimeState, runtimeState, session))
                {
                    if (outboundMessages?.Count > 0)
                    {
                        MessageContainer[] outboundBrokeredMessages = await Task.WhenAll(outboundMessages.Select(async m =>
                                {
                                    Message message = await ServiceBusUtils.GetBrokeredMessageFromObjectAsync(
                                        m,
                                        this.Settings.MessageCompressionSettings,
                                        this.Settings.MessageSettings,
                                        null,
                                        "Worker outbound message",
                                        this.BlobStore,
                                        DateTimeUtils.MinDateTime);
                                    return new MessageContainer(message, m);
                                }));
                        await this.workerSender.SendAsync(outboundBrokeredMessages.Select(m => m.Message).ToList());
                        LogSentMessages(session, "Worker outbound", outboundBrokeredMessages);
                        this.ServiceStats.ActivityDispatcherStats.MessageBatchesSent.Increment();
                        this.ServiceStats.ActivityDispatcherStats.MessagesSent.Increment(outboundMessages.Count);
                    }

                    if (timerMessages?.Count > 0 && newOrchestrationRuntimeState != null)
                    {
                        MessageContainer[] timerBrokeredMessages = await Task.WhenAll(timerMessages.Select(async m =>
                                {
                                    DateTime messageFireTime = ((TimerFiredEvent) m.Event).FireAt;
                                    Message message = await ServiceBusUtils.GetBrokeredMessageFromObjectAsync(
                                        m,
                                        this.Settings.MessageCompressionSettings,
                                        this.Settings.MessageSettings,
                                        newOrchestrationRuntimeState.OrchestrationInstance,
                                        "Timer Message",
                                        this.BlobStore,
                                        messageFireTime);
                                    message.ScheduledEnqueueTimeUtc = messageFireTime;
                                    return new MessageContainer(message, m);
                                }));

                        await this.orchestratorQueueClient.SendAsync(timerBrokeredMessages.Select(m => m.Message).ToList());
                        LogSentMessages(session, "Timer Message", timerBrokeredMessages);
                        this.ServiceStats.OrchestrationDispatcherStats.MessageBatchesSent.Increment();
                        this.ServiceStats.OrchestrationDispatcherStats.MessagesSent.Increment(timerMessages.Count);
                    }

                    if (orchestratorMessages?.Count > 0)
                    {
                        MessageContainer[] orchestrationBrokeredMessages = await Task.WhenAll(orchestratorMessages.Select(async m =>
                                {
                                    Message message = await ServiceBusUtils.GetBrokeredMessageFromObjectAsync(
                                        m,
                                        this.Settings.MessageCompressionSettings,
                                        this.Settings.MessageSettings,
                                        m.OrchestrationInstance,
                                        "Sub Orchestration",
                                        this.BlobStore,
                                        DateTimeUtils.MinDateTime);
                                    return new MessageContainer(message, m);
                                }));
                        await this.orchestratorQueueClient.SendAsync(orchestrationBrokeredMessages.Select(m => m.Message).ToList());

                        LogSentMessages(session, "Sub Orchestration", orchestrationBrokeredMessages);
                        this.ServiceStats.OrchestrationDispatcherStats.MessageBatchesSent.Increment();
                        this.ServiceStats.OrchestrationDispatcherStats.MessagesSent.Increment(orchestratorMessages.Count);
                    }

                    if (continuedAsNewMessage != null)
                    {
                        Message continuedAsNewBrokeredMessage = await ServiceBusUtils.GetBrokeredMessageFromObjectAsync(
                            continuedAsNewMessage,
                            this.Settings.MessageCompressionSettings,
                            this.Settings.MessageSettings,
                            newOrchestrationRuntimeState?.OrchestrationInstance,
                            "Continue as new",
                            this.BlobStore,
                            DateTimeUtils.MinDateTime);
                        await this.orchestratorQueueClient.SendAsync(continuedAsNewBrokeredMessage);
                        LogSentMessages(session, "Continue as new", new List<MessageContainer> { new MessageContainer(continuedAsNewBrokeredMessage, null) });
                        this.ServiceStats.OrchestrationDispatcherStats.MessageBatchesSent.Increment();
                        this.ServiceStats.OrchestrationDispatcherStats.MessagesSent.Increment();
                    }

                    if (this.InstanceStore != null)
                    {
                        List<MessageContainer> trackingMessages = await CreateTrackingMessagesAsync(runtimeState, sessionState.SequenceNumber);
                        TraceHelper.TraceInstance(
                            TraceEventType.Information,
                            "ServiceBusOrchestrationService-CompleteTaskOrchestrationWorkItem-TrackingMessages",
                            runtimeState.OrchestrationInstance,
                            "Created {0} tracking messages", trackingMessages.Count);

                        if (trackingMessages.Count > 0)
                        {
                            await this.trackingSender.SendAsync(trackingMessages.Select(m => m.Message).ToList());
                            LogSentMessages(session, "Tracking messages", trackingMessages);
                            this.ServiceStats.TrackingDispatcherStats.MessageBatchesSent.Increment();
                            this.ServiceStats.TrackingDispatcherStats.MessagesSent.Increment(trackingMessages.Count);
                        }

                        if (newOrchestrationRuntimeState != null && runtimeState != newOrchestrationRuntimeState)
                        {
                            trackingMessages = await CreateTrackingMessagesAsync(newOrchestrationRuntimeState, sessionState.SequenceNumber);
                            TraceHelper.TraceInstance(
                                TraceEventType.Information,
                                "ServiceBusOrchestrationService-CompleteTaskOrchestrationWorkItem-TrackingMessages",
                                newOrchestrationRuntimeState.OrchestrationInstance,
                                "Created {0} tracking messages", trackingMessages.Count);

                            if (trackingMessages.Count > 0)
                            {
                                await this.trackingSender.SendAsync(trackingMessages.Select(m => m.Message).ToList());
                                LogSentMessages(session, "Tracking messages", trackingMessages);
                                this.ServiceStats.TrackingDispatcherStats.MessageBatchesSent.Increment();
                                this.ServiceStats.TrackingDispatcherStats.MessagesSent.Increment(trackingMessages.Count);
                            }
                        }
                    }
                }

                TraceHelper.TraceInstance(
                    TraceEventType.Information,
                    "ServiceBusOrchestrationService-CompleteTaskOrchestrationWorkItemMessages",
                    runtimeState.OrchestrationInstance,
                    () =>
                    {
                        string allIds = string.Join(" ", sessionState.LockTokens.Values.Select(m => $"[SEQ: {m.SystemProperties.SequenceNumber} LT: {m.SystemProperties.LockToken}]"));
                        return $"Completing orchestration messages sequence and lock tokens: {allIds}";
                    });

                await session.CompleteAsync(sessionState.LockTokens.Values);
                this.ServiceStats.OrchestrationDispatcherStats.SessionBatchesCompleted.Increment();
                ts.Complete();
            }
        }