in src/DurableTask.Core/TaskEntityDispatcher.cs [451:603]
void DetermineWork(OrchestrationRuntimeState runtimeState, out SchedulerState schedulerState, out Work batch)
{
string instanceId = runtimeState.OrchestrationInstance.InstanceId;
schedulerState = new SchedulerState();
batch = new Work();
Queue<RequestMessage> lockHolderMessages = null;
foreach (HistoryEvent e in runtimeState.Events)
{
switch (e.EventType)
{
case EventType.ExecutionStarted:
if (runtimeState.Input != null)
{
try
{
// restore the scheduler state from the input
JsonConvert.PopulateObject(runtimeState.Input, schedulerState, Serializer.InternalSerializerSettings);
}
catch (Exception exception)
{
throw new EntitySchedulerException("Failed to deserialize entity scheduler state - may be corrupted or wrong version.", exception);
}
}
break;
case EventType.EventRaised:
EventRaisedEvent eventRaisedEvent = (EventRaisedEvent)e;
if (EntityMessageEventNames.IsRequestMessage(eventRaisedEvent.Name))
{
// we are receiving an operation request or a lock request
var requestMessage = new RequestMessage();
try
{
JsonConvert.PopulateObject(eventRaisedEvent.Input, requestMessage, Serializer.InternalSerializerSettings);
}
catch (Exception exception)
{
throw new EntitySchedulerException("Failed to deserialize incoming request message - may be corrupted or wrong version.", exception);
}
IEnumerable<RequestMessage> deliverNow;
if (requestMessage.ScheduledTime.HasValue)
{
if ((requestMessage.ScheduledTime.Value - DateTime.UtcNow) > TimeSpan.FromMilliseconds(100))
{
// message was delivered too early. This can happen e.g. if the orchestration service has limits on the delay times for messages.
// We handle this by rescheduling the message instead of processing it.
deliverNow = Array.Empty<RequestMessage>();
batch.AddMessageToBeRescheduled(requestMessage);
}
else
{
// the message is scheduled to be delivered immediately.
// There are no FIFO guarantees for scheduled messages, so we skip the message sorter.
deliverNow = new RequestMessage[] { requestMessage };
}
}
else
{
// run this through the message sorter to help with reordering and duplicate filtering
deliverNow = schedulerState.MessageSorter.ReceiveInOrder(requestMessage, this.entityBackendProperties.EntityMessageReorderWindow);
}
foreach (var message in deliverNow)
{
if (schedulerState.LockedBy != null && schedulerState.LockedBy == message.ParentInstanceId)
{
if (lockHolderMessages == null)
{
lockHolderMessages = new Queue<RequestMessage>();
}
lockHolderMessages.Enqueue(message);
}
else
{
schedulerState.Enqueue(message);
}
}
}
else if (EntityMessageEventNames.IsReleaseMessage(eventRaisedEvent.Name))
{
// we are receiving a lock release
var message = new ReleaseMessage();
try
{
// restore the scheduler state from the input
JsonConvert.PopulateObject(eventRaisedEvent.Input, message, Serializer.InternalSerializerSettings);
}
catch (Exception exception)
{
throw new EntitySchedulerException("Failed to deserialize lock release message - may be corrupted or wrong version.", exception);
}
if (schedulerState.LockedBy == message.ParentInstanceId)
{
this.logHelper.EntityLockReleased(instanceId, message);
schedulerState.LockedBy = null;
}
}
else
{
// this is a continue message.
// Resumes processing of previously queued operations, if any.
schedulerState.Suspended = false;
}
break;
}
}
// lock holder messages go to the front of the queue
if (lockHolderMessages != null)
{
schedulerState.PutBack(lockHolderMessages);
}
if (!schedulerState.Suspended)
{
// 2. We add as many requests from the queue to the batch as possible,
// stopping at lock requests or when the maximum batch size is reached
while (schedulerState.MayDequeue())
{
if (batch.OperationCount == this.entityBackendProperties.MaxEntityOperationBatchSize)
{
// we have reached the maximum batch size already
// insert a delay after this batch to ensure write back
batch.ToBeContinued(schedulerState);
break;
}
var request = schedulerState.Dequeue();
if (request.IsLockRequest)
{
batch.AddLockRequest(request);
break;
}
else
{
batch.AddOperation(request);
}
}
}
}