void DetermineWork()

in src/DurableTask.Core/TaskEntityDispatcher.cs [451:603]


        void DetermineWork(OrchestrationRuntimeState runtimeState, out SchedulerState schedulerState, out Work batch)
        {
            string instanceId = runtimeState.OrchestrationInstance.InstanceId;
            schedulerState = new SchedulerState();
            batch = new Work();

            Queue<RequestMessage> lockHolderMessages = null;

            foreach (HistoryEvent e in runtimeState.Events)
            {
                switch (e.EventType)
                {
                    case EventType.ExecutionStarted:


                        if (runtimeState.Input != null)
                        {
                            try
                            {
                                // restore the scheduler state from the input
                                JsonConvert.PopulateObject(runtimeState.Input, schedulerState, Serializer.InternalSerializerSettings);
                            }
                            catch (Exception exception)
                            {
                                throw new EntitySchedulerException("Failed to deserialize entity scheduler state - may be corrupted or wrong version.", exception);
                            }
                        }
                        break;

                    case EventType.EventRaised:
                        EventRaisedEvent eventRaisedEvent = (EventRaisedEvent)e;

                        if (EntityMessageEventNames.IsRequestMessage(eventRaisedEvent.Name))
                        {
                            // we are receiving an operation request or a lock request
                            var requestMessage = new RequestMessage();

                            try
                            {
                                JsonConvert.PopulateObject(eventRaisedEvent.Input, requestMessage, Serializer.InternalSerializerSettings);
                            }
                            catch (Exception exception)
                            {
                                throw new EntitySchedulerException("Failed to deserialize incoming request message - may be corrupted or wrong version.", exception);
                            }

                            IEnumerable<RequestMessage> deliverNow;

                            if (requestMessage.ScheduledTime.HasValue)
                            {
                                if ((requestMessage.ScheduledTime.Value - DateTime.UtcNow) > TimeSpan.FromMilliseconds(100))
                                {
                                    // message was delivered too early. This can happen e.g. if the orchestration service has limits on the delay times for messages.
                                    // We handle this by rescheduling the message instead of processing it.
                                    deliverNow = Array.Empty<RequestMessage>();
                                    batch.AddMessageToBeRescheduled(requestMessage);
                                }
                                else
                                {
                                    // the message is scheduled to be delivered immediately.
                                    // There are no FIFO guarantees for scheduled messages, so we skip the message sorter.
                                    deliverNow = new RequestMessage[] { requestMessage };
                                }
                            }
                            else
                            {
                                // run this through the message sorter to help with reordering and duplicate filtering
                                deliverNow = schedulerState.MessageSorter.ReceiveInOrder(requestMessage, this.entityBackendProperties.EntityMessageReorderWindow);
                            }

                            foreach (var message in deliverNow)
                            {
                                if (schedulerState.LockedBy != null && schedulerState.LockedBy == message.ParentInstanceId)
                                {
                                    if (lockHolderMessages == null)
                                    {
                                        lockHolderMessages = new Queue<RequestMessage>();
                                    }

                                    lockHolderMessages.Enqueue(message);
                                }
                                else
                                {
                                    schedulerState.Enqueue(message);
                                }
                            }
                        }
                        else if (EntityMessageEventNames.IsReleaseMessage(eventRaisedEvent.Name))
                        {
                            // we are receiving a lock release
                            var message = new ReleaseMessage();
                            try
                            {
                                // restore the scheduler state from the input
                                JsonConvert.PopulateObject(eventRaisedEvent.Input, message, Serializer.InternalSerializerSettings);
                            }
                            catch (Exception exception)
                            {
                                throw new EntitySchedulerException("Failed to deserialize lock release message - may be corrupted or wrong version.", exception);
                            }

                            if (schedulerState.LockedBy == message.ParentInstanceId)
                            {
                                this.logHelper.EntityLockReleased(instanceId, message);
                                schedulerState.LockedBy = null;
                            }
                        }
                        else
                        {
                            // this is a continue message.
                            // Resumes processing of previously queued operations, if any.
                            schedulerState.Suspended = false;
                        }

                        break;

                }
            }

            // lock holder messages go to the front of the queue
            if (lockHolderMessages != null)
            {
                schedulerState.PutBack(lockHolderMessages);
            }

            if (!schedulerState.Suspended)
            {
                // 2. We add as many requests from the queue to the batch as possible,
                // stopping at lock requests or when the maximum batch size is reached
                while (schedulerState.MayDequeue())
                {
                    if (batch.OperationCount == this.entityBackendProperties.MaxEntityOperationBatchSize)
                    {
                        // we have reached the maximum batch size already
                        // insert a delay after this batch to ensure write back
                        batch.ToBeContinued(schedulerState);
                        break;
                    }

                    var request = schedulerState.Dequeue();

                    if (request.IsLockRequest)
                    {
                        batch.AddLockRequest(request);
                        break;
                    }
                    else
                    {
                        batch.AddOperation(request);
                    }
                }
            }
        }