in src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs [697:882]
async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(bool entitiesOnly, CancellationToken cancellationToken)
{
Guid traceActivityId = StartNewLogicalTraceScope(useExisting: true);
await this.EnsureTaskHubAsync();
using (var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, this.shutdownSource.Token))
{
OrchestrationSession session = null;
TaskOrchestrationWorkItem orchestrationWorkItem = null;
try
{
// This call will block until the next session is ready
session = await this.orchestrationSessionManager.GetNextSessionAsync(entitiesOnly, linkedCts.Token);
if (session == null)
{
return null;
}
// Make sure we still own the partition. If not, abandon the session.
if (session.ControlQueue.IsReleased)
{
await this.AbandonAndReleaseSessionAsync(session);
return null;
}
session.StartNewLogicalTraceScope();
List<MessageData> outOfOrderMessages = null;
foreach (MessageData message in session.CurrentMessageBatch)
{
if (session.IsOutOfOrderMessage(message))
{
if (outOfOrderMessages == null)
{
outOfOrderMessages = new List<MessageData>();
}
// This can happen if a lease change occurs and a new node receives a message for an
// orchestration that has not yet checkpointed its history. We abandon such messages
// so that they can be reprocessed after the history checkpoint has completed.
this.settings.Logger.ReceivedOutOfOrderMessage(
this.azureStorageClient.QueueAccountName,
this.settings.TaskHubName,
session.Instance.InstanceId,
session.Instance.ExecutionId,
session.ControlQueue.Name,
message.TaskMessage.Event.EventType.ToString(),
Utils.GetTaskEventId(message.TaskMessage.Event),
message.OriginalQueueMessage.MessageId,
message.Episode.GetValueOrDefault(-1),
session.LastCheckpointTime);
outOfOrderMessages.Add(message);
}
else
{
session.TraceProcessingMessage(
message,
isExtendedSession: false,
partitionId: session.ControlQueue.Name);
}
}
if (outOfOrderMessages?.Count > 0)
{
// This will also remove the messages from the current batch.
await this.AbandonMessagesAsync(session, outOfOrderMessages);
}
if (session.CurrentMessageBatch.Count == 0)
{
// All messages were removed. Release the work item.
await this.AbandonAndReleaseSessionAsync(session);
return null;
}
// Create or restore Correlation TraceContext
TraceContextBase currentRequestTraceContext = null;
CorrelationTraceClient.Propagate(
() =>
{
var isReplaying = session.RuntimeState.ExecutionStartedEvent?.IsPlayed ?? false;
TraceContextBase parentTraceContext = GetParentTraceContext(session);
currentRequestTraceContext = GetRequestTraceContext(isReplaying, parentTraceContext);
});
orchestrationWorkItem = new TaskOrchestrationWorkItem
{
InstanceId = session.Instance.InstanceId,
LockedUntilUtc = session.CurrentMessageBatch.Min(msg => msg.OriginalQueueMessage.NextVisibleOn.Value.UtcDateTime),
NewMessages = session.CurrentMessageBatch.Select(m => m.TaskMessage).ToList(),
OrchestrationRuntimeState = session.RuntimeState,
Session = this.settings.ExtendedSessionsEnabled ? session : null,
TraceContext = currentRequestTraceContext,
};
if (!this.IsExecutableInstance(session.RuntimeState, orchestrationWorkItem.NewMessages, settings.AllowReplayingTerminalInstances, out string warningMessage))
{
// If all messages belong to the same execution ID, then all of them need to be discarded.
// However, it's also possible to have messages for *any* execution ID batched together with messages
// to a *specific* (non-executable) execution ID. Those messages should *not* be discarded since
// they might be consumable by another orchestration with the same instance id but different execution ID.
var messagesToDiscard = new List<MessageData>();
var messagesToAbandon = new List<MessageData>();
foreach (MessageData msg in session.CurrentMessageBatch)
{
if (msg.TaskMessage.OrchestrationInstance.ExecutionId == session.Instance.ExecutionId)
{
messagesToDiscard.Add(msg);
}
else
{
messagesToAbandon.Add(msg);
}
}
// If no messages have a matching execution ID, then delete all of them. This means all the
// messages are external (external events, termination, etc.) and were sent to an instance that
// doesn't exist or is no longer in a running state.
if (messagesToDiscard.Count == 0)
{
messagesToDiscard.AddRange(messagesToAbandon);
messagesToAbandon.Clear();
}
// Add all abandoned messages to the deferred list. These messages will not be deleted right now.
// If they can be matched with another orchestration, then great. Otherwise they will be deleted
// the next time they are picked up.
messagesToAbandon.ForEach(session.DeferMessage);
var eventListBuilder = new StringBuilder(orchestrationWorkItem.NewMessages.Count * 40);
foreach (MessageData msg in messagesToDiscard)
{
eventListBuilder.Append(msg.TaskMessage.Event.EventType.ToString()).Append(',');
}
this.settings.Logger.DiscardingWorkItem(
this.azureStorageClient.QueueAccountName,
this.settings.TaskHubName,
session.Instance.InstanceId,
session.Instance.ExecutionId,
orchestrationWorkItem.NewMessages.Count,
session.RuntimeState.Events.Count,
eventListBuilder.ToString(0, eventListBuilder.Length - 1) /* remove trailing comma */,
warningMessage);
// The instance has already completed or never existed. Delete this message batch.
await this.DeleteMessageBatchAsync(session, messagesToDiscard);
await this.ReleaseTaskOrchestrationWorkItemAsync(orchestrationWorkItem);
return null;
}
return orchestrationWorkItem;
}
catch (OperationCanceledException)
{
if (session != null)
{
// host is shutting down - release any queued messages
await this.AbandonAndReleaseSessionAsync(session);
}
return null;
}
catch (Exception e)
{
this.settings.Logger.OrchestrationProcessingFailure(
this.azureStorageClient.QueueAccountName,
this.settings.TaskHubName,
session?.Instance.InstanceId ?? string.Empty,
session?.Instance.ExecutionId ?? string.Empty,
e.ToString());
if (orchestrationWorkItem != null)
{
// The work-item needs to be released so that it can be retried later.
await this.ReleaseTaskOrchestrationWorkItemAsync(orchestrationWorkItem);
}
throw;
}
}
}