in src/DurableTask.ServiceBus/ServiceBusOrchestrationService.cs [690:862]
public async Task CompleteTaskOrchestrationWorkItemAsync(
TaskOrchestrationWorkItem workItem,
OrchestrationRuntimeState newOrchestrationRuntimeState,
IList<TaskMessage> outboundMessages,
IList<TaskMessage> orchestratorMessages,
IList<TaskMessage> timerMessages,
TaskMessage continuedAsNewMessage,
OrchestrationState orchestrationState)
{
OrchestrationRuntimeState runtimeState = workItem.OrchestrationRuntimeState;
ServiceBusOrchestrationSession sessionState = GetSessionInstanceForWorkItem(workItem);
if (sessionState == null)
{
// ReSharper disable once NotResolvedInText
throw new ArgumentNullException("SessionInstance");
}
var session = sessionState.Session;
using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
Transaction.Current.TransactionCompleted += (o, e) =>
TraceHelper.TraceInstance(
e.Transaction.TransactionInformation.Status == TransactionStatus.Committed ? TraceEventType.Information : TraceEventType.Error,
"ServiceBusOrchestrationService-CompleteTaskOrchestrationWorkItem-TransactionComplete",
runtimeState.OrchestrationInstance,
() => $@"Orchestration Transaction Completed {
e.Transaction.TransactionInformation.LocalIdentifier
} status: {
e.Transaction.TransactionInformation.Status}");
TraceHelper.TraceInstance(
TraceEventType.Information,
"ServiceBusOrchestrationService-CompleteTaskOrchestrationWorkItem-CreateTransaction",
runtimeState.OrchestrationInstance,
() => $@"Created new Orchestration Transaction - txnid: {
Transaction.Current.TransactionInformation.LocalIdentifier
}");
if (await TrySetSessionStateAsync(workItem, newOrchestrationRuntimeState, runtimeState, session))
{
if (outboundMessages?.Count > 0)
{
MessageContainer[] outboundBrokeredMessages = await Task.WhenAll(outboundMessages.Select(async m =>
{
Message message = await ServiceBusUtils.GetBrokeredMessageFromObjectAsync(
m,
this.Settings.MessageCompressionSettings,
this.Settings.MessageSettings,
null,
"Worker outbound message",
this.BlobStore,
DateTimeUtils.MinDateTime);
return new MessageContainer(message, m);
}));
await this.workerSender.SendAsync(outboundBrokeredMessages.Select(m => m.Message).ToList());
LogSentMessages(session, "Worker outbound", outboundBrokeredMessages);
this.ServiceStats.ActivityDispatcherStats.MessageBatchesSent.Increment();
this.ServiceStats.ActivityDispatcherStats.MessagesSent.Increment(outboundMessages.Count);
}
if (timerMessages?.Count > 0 && newOrchestrationRuntimeState != null)
{
MessageContainer[] timerBrokeredMessages = await Task.WhenAll(timerMessages.Select(async m =>
{
DateTime messageFireTime = ((TimerFiredEvent) m.Event).FireAt;
Message message = await ServiceBusUtils.GetBrokeredMessageFromObjectAsync(
m,
this.Settings.MessageCompressionSettings,
this.Settings.MessageSettings,
newOrchestrationRuntimeState.OrchestrationInstance,
"Timer Message",
this.BlobStore,
messageFireTime);
message.ScheduledEnqueueTimeUtc = messageFireTime;
return new MessageContainer(message, m);
}));
await this.orchestratorQueueClient.SendAsync(timerBrokeredMessages.Select(m => m.Message).ToList());
LogSentMessages(session, "Timer Message", timerBrokeredMessages);
this.ServiceStats.OrchestrationDispatcherStats.MessageBatchesSent.Increment();
this.ServiceStats.OrchestrationDispatcherStats.MessagesSent.Increment(timerMessages.Count);
}
if (orchestratorMessages?.Count > 0)
{
MessageContainer[] orchestrationBrokeredMessages = await Task.WhenAll(orchestratorMessages.Select(async m =>
{
Message message = await ServiceBusUtils.GetBrokeredMessageFromObjectAsync(
m,
this.Settings.MessageCompressionSettings,
this.Settings.MessageSettings,
m.OrchestrationInstance,
"Sub Orchestration",
this.BlobStore,
DateTimeUtils.MinDateTime);
return new MessageContainer(message, m);
}));
await this.orchestratorQueueClient.SendAsync(orchestrationBrokeredMessages.Select(m => m.Message).ToList());
LogSentMessages(session, "Sub Orchestration", orchestrationBrokeredMessages);
this.ServiceStats.OrchestrationDispatcherStats.MessageBatchesSent.Increment();
this.ServiceStats.OrchestrationDispatcherStats.MessagesSent.Increment(orchestratorMessages.Count);
}
if (continuedAsNewMessage != null)
{
Message continuedAsNewBrokeredMessage = await ServiceBusUtils.GetBrokeredMessageFromObjectAsync(
continuedAsNewMessage,
this.Settings.MessageCompressionSettings,
this.Settings.MessageSettings,
newOrchestrationRuntimeState?.OrchestrationInstance,
"Continue as new",
this.BlobStore,
DateTimeUtils.MinDateTime);
await this.orchestratorQueueClient.SendAsync(continuedAsNewBrokeredMessage);
LogSentMessages(session, "Continue as new", new List<MessageContainer> { new MessageContainer(continuedAsNewBrokeredMessage, null) });
this.ServiceStats.OrchestrationDispatcherStats.MessageBatchesSent.Increment();
this.ServiceStats.OrchestrationDispatcherStats.MessagesSent.Increment();
}
if (this.InstanceStore != null)
{
List<MessageContainer> trackingMessages = await CreateTrackingMessagesAsync(runtimeState, sessionState.SequenceNumber);
TraceHelper.TraceInstance(
TraceEventType.Information,
"ServiceBusOrchestrationService-CompleteTaskOrchestrationWorkItem-TrackingMessages",
runtimeState.OrchestrationInstance,
"Created {0} tracking messages", trackingMessages.Count);
if (trackingMessages.Count > 0)
{
await this.trackingSender.SendAsync(trackingMessages.Select(m => m.Message).ToList());
LogSentMessages(session, "Tracking messages", trackingMessages);
this.ServiceStats.TrackingDispatcherStats.MessageBatchesSent.Increment();
this.ServiceStats.TrackingDispatcherStats.MessagesSent.Increment(trackingMessages.Count);
}
if (newOrchestrationRuntimeState != null && runtimeState != newOrchestrationRuntimeState)
{
trackingMessages = await CreateTrackingMessagesAsync(newOrchestrationRuntimeState, sessionState.SequenceNumber);
TraceHelper.TraceInstance(
TraceEventType.Information,
"ServiceBusOrchestrationService-CompleteTaskOrchestrationWorkItem-TrackingMessages",
newOrchestrationRuntimeState.OrchestrationInstance,
"Created {0} tracking messages", trackingMessages.Count);
if (trackingMessages.Count > 0)
{
await this.trackingSender.SendAsync(trackingMessages.Select(m => m.Message).ToList());
LogSentMessages(session, "Tracking messages", trackingMessages);
this.ServiceStats.TrackingDispatcherStats.MessageBatchesSent.Increment();
this.ServiceStats.TrackingDispatcherStats.MessagesSent.Increment(trackingMessages.Count);
}
}
}
}
TraceHelper.TraceInstance(
TraceEventType.Information,
"ServiceBusOrchestrationService-CompleteTaskOrchestrationWorkItemMessages",
runtimeState.OrchestrationInstance,
() =>
{
string allIds = string.Join(" ", sessionState.LockTokens.Values.Select(m => $"[SEQ: {m.SystemProperties.SequenceNumber} LT: {m.SystemProperties.LockToken}]"));
return $"Completing orchestration messages sequence and lock tokens: {allIds}";
});
await session.CompleteAsync(sessionState.LockTokens.Values);
this.ServiceStats.OrchestrationDispatcherStats.SessionBatchesCompleted.Increment();
ts.Complete();
}
}