in src/DurableTask.ServiceBus/ServiceBusOrchestrationService.cs [533:611]
public async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken)
{
var session = await this.orchestratorSessionClient.AcceptMessageSessionAsync(receiveTimeout);
if (session == null)
{
return null;
}
this.ServiceStats.OrchestrationDispatcherStats.SessionsReceived.Increment();
// TODO : Here and elsewhere, consider standard retry block instead of our own hand rolled version
IList<Message> newMessages =
(await Utils.ExecuteWithRetries(() => session.ReceiveAsync(this.Settings.PrefetchCount),
session.SessionId, "Receive Session Message Batch", this.Settings.MaxRetries, this.Settings.IntervalBetweenRetriesSecs)).Cast<Message>().ToList();
this.ServiceStats.OrchestrationDispatcherStats.MessagesReceived.Increment(newMessages.Count);
TraceHelper.TraceSession(
TraceEventType.Information,
"ServiceBusOrchestrationService-LockNextTaskOrchestrationWorkItem-MessageToProcess",
session.SessionId,
GetFormattedLog(
$@"{newMessages.Count} new messages to process: {
string.Join(",", newMessages.Select(m => m.MessageId))}, max latency: {
newMessages.Max(message => message.DeliveryLatency())}ms"));
ServiceBusUtils.CheckAndLogDeliveryCount(session.SessionId, newMessages, this.Settings.MaxTaskOrchestrationDeliveryCount);
IList<TaskMessage> newTaskMessages = await Task.WhenAll(
newMessages.Select(async message => await ServiceBusUtils.GetObjectFromBrokeredMessageAsync<TaskMessage>(message, this.BlobStore)));
OrchestrationRuntimeState runtimeState = await GetSessionStateAsync(session, this.BlobStore);
long maxSequenceNumber = newMessages.Max(message => message.SystemProperties.SequenceNumber);
Dictionary<string, Message> lockTokens = newMessages.ToDictionary(m => m.SystemProperties.LockToken.ToString(), m => m);
var sessionState = new ServiceBusOrchestrationSession
{
Session = session,
LockTokens = lockTokens,
SequenceNumber = maxSequenceNumber
};
if (!this.orchestrationSessions.TryAdd(session.SessionId, sessionState))
{
string error = $"Duplicate orchestration session id '{session.SessionId}', id already exists in session list.";
TraceHelper.Trace(TraceEventType.Error, "ServiceBusOrchestrationService-LockNextTaskOrchestrationWorkItem-DuplicateSessionId", error);
throw new OrchestrationFrameworkException(error);
}
if (this.InstanceStore != null)
{
try
{
TaskMessage executionStartedMessage = newTaskMessages.FirstOrDefault(m => m.Event is ExecutionStartedEvent);
if (executionStartedMessage != null)
{
await UpdateInstanceStoreAsync(executionStartedMessage.Event as ExecutionStartedEvent, maxSequenceNumber);
}
}
catch (Exception exception)
{
this.orchestrationSessions.TryRemove(session.SessionId, out ServiceBusOrchestrationSession _);
string error = $"Exception while updating instance store. Session id: {session.SessionId}";
TraceHelper.TraceException(TraceEventType.Error, "ServiceBusOrchestrationService-LockNextTaskOrchestrationWorkItem-ErrorUpdatingInstanceStore", exception, error);
throw;
}
}
return new TaskOrchestrationWorkItem
{
InstanceId = session.SessionId,
LockedUntilUtc = session.LockedUntilUtc,
NewMessages = newTaskMessages.ToList(),
OrchestrationRuntimeState = runtimeState
};
}