in src/DurableTask.AzureStorage/OrchestrationSessionManager.cs [559:638]
public async Task<OrchestrationSession?> GetNextSessionAsync(bool entitiesOnly, CancellationToken cancellationToken)
{
var readyForProcessingQueue = entitiesOnly? this.entitiesReadyForProcessingQueue : this.orchestrationsReadyForProcessingQueue;
while (!cancellationToken.IsCancellationRequested)
{
// This call will block until:
// 1) a batch of messages has been received for a particular instance and
// 2) the history for that instance has been fetched
LinkedListNode<PendingMessageBatch> node = await readyForProcessingQueue.DequeueAsync(cancellationToken);
lock (this.messageAndSessionLock)
{
PendingMessageBatch nextBatch = node.Value;
this.pendingOrchestrationMessageBatches.Remove(node);
if (!this.activeOrchestrationSessions.TryGetValue(nextBatch.OrchestrationInstanceId, out var existingSession))
{
OrchestrationInstance instance = nextBatch.OrchestrationState?.OrchestrationInstance ??
new OrchestrationInstance
{
InstanceId = nextBatch.OrchestrationInstanceId,
ExecutionId = nextBatch.OrchestrationExecutionId,
};
Guid traceActivityId = AzureStorageOrchestrationService.StartNewLogicalTraceScope(useExisting: true);
OrchestrationSession session = new OrchestrationSession(
this.settings,
this.storageAccountName,
instance,
nextBatch.ControlQueue,
nextBatch.Messages,
nextBatch.OrchestrationState,
nextBatch.ETag,
nextBatch.LastCheckpointTime,
nextBatch.TrackingStoreContext,
this.settings.ExtendedSessionIdleTimeout,
traceActivityId);
this.activeOrchestrationSessions.Add(instance.InstanceId, session);
return session;
}
else if (nextBatch.OrchestrationExecutionId == existingSession.Instance?.ExecutionId)
{
// there is already an active session with the same execution id.
// The session might be waiting for more messages. If it is, signal them.
existingSession.AddOrReplaceMessages(node.Value.Messages);
}
else
{
// A message arrived for a different generation of an existing orchestration instance.
// Put it back into the ready queue so that it can be processed once the current generation
// is done executing.
if (readyForProcessingQueue.Count == 0)
{
// To avoid a tight dequeue loop, delay for a bit before putting this node back into the queue.
// This is only necessary when the queue is empty. The main dequeue thread must not be blocked
// by this delay, which is why we use Task.Delay(...).ContinueWith(...) instead of await.
Task.Delay(millisecondsDelay: 200).ContinueWith(_ =>
{
lock (this.messageAndSessionLock)
{
this.pendingOrchestrationMessageBatches.AddLast(node);
readyForProcessingQueue.Enqueue(node);
}
});
}
else
{
this.pendingOrchestrationMessageBatches.AddLast(node);
readyForProcessingQueue.Enqueue(node);
}
}
}
}
return null;
}