public IEnumerable ReceiveInOrder()

in src/DurableTask.Core/Entities/StateFormat/MessageSorter.cs [114:241]


        public IEnumerable<RequestMessage> ReceiveInOrder(RequestMessage message, TimeSpan reorderWindow)
        {
            // messages sent from clients and forwarded lock messages are not participating in the sorting.
            if (reorderWindow.Ticks == 0 || message.ParentInstanceId == null || message.Position > 0)
            {
                // Just pass the message through.
                yield return message;
                yield break;
            }

            // whenever (ReceiveHorizon + reorderWindow < message.Timestamp), we can advance the receive horizon to (message.Timestamp - reorderWindow)
            // and then we can clean out all the no-longer-needed entries of ReceivedFromInstance.
            // However, to reduce the overhead of doing this collection, we don't update the receive horizon immediately when possible.
            // Instead, we make sure at least MinIntervalBetweenCollections passes between collections.
            if (ReceiveHorizon + reorderWindow + MinIntervalBetweenCollections < message.Timestamp)
            {
                ReceiveHorizon = message.Timestamp - reorderWindow;

                // deliver any messages that were held in the receive buffers
                // but are now past the reorder window

                List<string> buffersToRemove = new List<string>();

                if (ReceivedFromInstance != null)
                {
                    foreach (var kvp in ReceivedFromInstance)
                    {
                        if (kvp.Value.Last < ReceiveHorizon)
                        {
                            // we reset Last to MinValue; this means all future messages received
                            // are treated as if they were the first message received.  
                            kvp.Value.Last = DateTime.MinValue;
                        }

                        while (TryDeliverNextMessage(kvp.Value, out var next))
                        {
                            yield return next;
                        }

                        if (kvp.Value.Last == DateTime.MinValue
                            && (kvp.Value.Buffered == null || kvp.Value.Buffered.Count == 0))
                        {
                            // we no longer need to store this buffer since it contains no relevant information anymore
                            // (it is back to its initial "empty" state)
                            buffersToRemove.Add(kvp.Key);
                        }
                    }

                    foreach (var t in buffersToRemove)
                    {
                        ReceivedFromInstance.Remove(t);
                    }

                    if (ReceivedFromInstance.Count == 0)
                    {
                        ReceivedFromInstance = null;
                    }
                }
            }

            // Messages older than the reorder window are not participating.
            if (message.Timestamp < ReceiveHorizon)
            {
                // Just pass the message through.
                yield return message;
                yield break;
            }

            ReceiveBuffer receiveBuffer;

            if (ReceivedFromInstance == null)
            {
                ReceivedFromInstance = new Dictionary<string, ReceiveBuffer>(StringComparer.OrdinalIgnoreCase);
            }

            if (!ReceivedFromInstance.TryGetValue(message.ParentInstanceId, out receiveBuffer))
            {
                ReceivedFromInstance[message.ParentInstanceId] = receiveBuffer = new ReceiveBuffer()
                {
                    ExecutionId = message.ParentExecutionId,
                };
            }
            else if (receiveBuffer.ExecutionId != message.ParentExecutionId)
            {
                // this message is from a new execution; release all buffered messages and start over
                if (receiveBuffer.Buffered != null)
                {
                    foreach (var kvp in receiveBuffer.Buffered)
                    {
                        yield return kvp.Value;
                    }

                    receiveBuffer.Buffered.Clear();
                }

                receiveBuffer.Last = DateTime.MinValue;
                receiveBuffer.ExecutionId = message.ParentExecutionId;
            }

            if (message.Timestamp <= receiveBuffer.Last)
            {
                // This message was already delivered, it's a duplicate
                yield break;
            }

            if (message.Predecessor > receiveBuffer.Last
                && message.Predecessor >= ReceiveHorizon)
            {
                // this message is waiting for a non-delivered predecessor in the window, buffer it
                if (receiveBuffer.Buffered == null)
                {
                    receiveBuffer.Buffered = new SortedDictionary<DateTime, RequestMessage>();
                }

                receiveBuffer.Buffered[message.Timestamp] = message;
            }
            else
            {
                yield return message;

                receiveBuffer.Last = message.Timestamp >= ReceiveHorizon ? message.Timestamp : DateTime.MinValue;

                while (TryDeliverNextMessage(receiveBuffer, out var next))
                {
                    yield return next;
                }
            }
        }