public IEnumerable ReceiveInOrder()

in src/WebJobs.Extensions.DurableTask/EntityScheduler/MessageSorter.cs [125:267]


        public IEnumerable<RequestMessage> ReceiveInOrder(RequestMessage message, TimeSpan reorderWindow)
        {
            // messages sent from clients and forwarded lock messages are not participating in the sorting.
            if (reorderWindow.Ticks == 0 || message.ParentInstanceId == null || message.Position > 0)
            {
                // Just pass the message through.
                yield return message;
                yield break;
            }

            bool countingEntries = false;
            int sourceCount = 0;
            int messageCount = 0;

            // advance the horizon based on the latest timestamp
            if (this.ReceiveHorizon + reorderWindow + MinIntervalBetweenCollections < message.Timestamp)
            {
                countingEntries = true;

                this.ReceiveHorizon = message.Timestamp - reorderWindow;

                // deliver any messages that were held in the receive buffers
                // but are now past the reorder window

                List<string> emptyReceiveBuffers = new List<string>();

                if (this.ReceivedFromInstance != null)
                {
                    foreach (var kvp in this.ReceivedFromInstance)
                    {
                        if (kvp.Value.Last < this.ReceiveHorizon)
                        {
                            kvp.Value.Last = DateTime.MinValue;
                        }

                        while (this.TryDeliverNextMessage(kvp.Value, out var next))
                        {
                            yield return next;
                        }

                        if (kvp.Value.Last == DateTime.MinValue
                            && (kvp.Value.Buffered == null || kvp.Value.Buffered.Count == 0))
                        {
                            emptyReceiveBuffers.Add(kvp.Key);
                        }
                        else
                        {
                            sourceCount++;
                            messageCount += kvp.Value.Buffered?.Count() ?? 0;
                        }
                    }

                    foreach (var t in emptyReceiveBuffers)
                    {
                        this.ReceivedFromInstance.Remove(t);
                    }

                    if (this.ReceivedFromInstance.Count == 0)
                    {
                        this.ReceivedFromInstance = null;
                    }
                }
            }

            // Messages older than the reorder window are not participating.
            if (message.Timestamp < this.ReceiveHorizon)
            {
                // Just pass the message through.
                yield return message;

                this.NumSources = countingEntries ? sourceCount : (int?)null;
                this.NumMessages = countingEntries ? messageCount : (int?)null;
                yield break;
            }

            ReceiveBuffer receiveBuffer;

            if (this.ReceivedFromInstance == null)
            {
                this.ReceivedFromInstance = new Dictionary<string, ReceiveBuffer>(StringComparer.OrdinalIgnoreCase);
            }

            if (!this.ReceivedFromInstance.TryGetValue(message.ParentInstanceId, out receiveBuffer))
            {
                this.ReceivedFromInstance[message.ParentInstanceId] = receiveBuffer = new ReceiveBuffer()
                {
                    ExecutionId = message.ParentExecutionId,
                };

                sourceCount++;
            }
            else if (receiveBuffer.ExecutionId != message.ParentExecutionId)
            {
                // this message is from a new execution; release all buffered messages and start over
                if (receiveBuffer.Buffered != null)
                {
                    foreach (var kvp in receiveBuffer.Buffered)
                    {
                        yield return kvp.Value;
                        messageCount--;
                    }

                    receiveBuffer.Buffered.Clear();
                }

                receiveBuffer.Last = DateTime.MinValue;
                receiveBuffer.ExecutionId = message.ParentExecutionId;
            }

            if (message.Timestamp <= receiveBuffer.Last)
            {
                // This message was already delivered, it's a duplicate
                yield break;
            }

            if (message.Predecessor > receiveBuffer.Last
                && message.Predecessor >= this.ReceiveHorizon)
            {
                // this message is waiting for a non-delivered predecessor in the window, buffer it
                if (receiveBuffer.Buffered == null)
                {
                    receiveBuffer.Buffered = new SortedDictionary<DateTime, RequestMessage>();
                }

                receiveBuffer.Buffered[message.Timestamp] = message;
                messageCount++;
            }
            else
            {
                yield return message;

                receiveBuffer.Last = message.Timestamp >= this.ReceiveHorizon ? message.Timestamp : DateTime.MinValue;

                while (this.TryDeliverNextMessage(receiveBuffer, out var next))
                {
                    yield return next;
                    messageCount--;
                }
            }

            this.NumSources = countingEntries ? sourceCount : (int?)null;
            this.NumMessages = countingEntries ? messageCount : (int?)null;
        }