in src/DurableTask.Core/Entities/StateFormat/MessageSorter.cs [114:241]
public IEnumerable<RequestMessage> ReceiveInOrder(RequestMessage message, TimeSpan reorderWindow)
{
// messages sent from clients and forwarded lock messages are not participating in the sorting.
if (reorderWindow.Ticks == 0 || message.ParentInstanceId == null || message.Position > 0)
{
// Just pass the message through.
yield return message;
yield break;
}
// whenever (ReceiveHorizon + reorderWindow < message.Timestamp), we can advance the receive horizon to (message.Timestamp - reorderWindow)
// and then we can clean out all the no-longer-needed entries of ReceivedFromInstance.
// However, to reduce the overhead of doing this collection, we don't update the receive horizon immediately when possible.
// Instead, we make sure at least MinIntervalBetweenCollections passes between collections.
if (ReceiveHorizon + reorderWindow + MinIntervalBetweenCollections < message.Timestamp)
{
ReceiveHorizon = message.Timestamp - reorderWindow;
// deliver any messages that were held in the receive buffers
// but are now past the reorder window
List<string> buffersToRemove = new List<string>();
if (ReceivedFromInstance != null)
{
foreach (var kvp in ReceivedFromInstance)
{
if (kvp.Value.Last < ReceiveHorizon)
{
// we reset Last to MinValue; this means all future messages received
// are treated as if they were the first message received.
kvp.Value.Last = DateTime.MinValue;
}
while (TryDeliverNextMessage(kvp.Value, out var next))
{
yield return next;
}
if (kvp.Value.Last == DateTime.MinValue
&& (kvp.Value.Buffered == null || kvp.Value.Buffered.Count == 0))
{
// we no longer need to store this buffer since it contains no relevant information anymore
// (it is back to its initial "empty" state)
buffersToRemove.Add(kvp.Key);
}
}
foreach (var t in buffersToRemove)
{
ReceivedFromInstance.Remove(t);
}
if (ReceivedFromInstance.Count == 0)
{
ReceivedFromInstance = null;
}
}
}
// Messages older than the reorder window are not participating.
if (message.Timestamp < ReceiveHorizon)
{
// Just pass the message through.
yield return message;
yield break;
}
ReceiveBuffer receiveBuffer;
if (ReceivedFromInstance == null)
{
ReceivedFromInstance = new Dictionary<string, ReceiveBuffer>(StringComparer.OrdinalIgnoreCase);
}
if (!ReceivedFromInstance.TryGetValue(message.ParentInstanceId, out receiveBuffer))
{
ReceivedFromInstance[message.ParentInstanceId] = receiveBuffer = new ReceiveBuffer()
{
ExecutionId = message.ParentExecutionId,
};
}
else if (receiveBuffer.ExecutionId != message.ParentExecutionId)
{
// this message is from a new execution; release all buffered messages and start over
if (receiveBuffer.Buffered != null)
{
foreach (var kvp in receiveBuffer.Buffered)
{
yield return kvp.Value;
}
receiveBuffer.Buffered.Clear();
}
receiveBuffer.Last = DateTime.MinValue;
receiveBuffer.ExecutionId = message.ParentExecutionId;
}
if (message.Timestamp <= receiveBuffer.Last)
{
// This message was already delivered, it's a duplicate
yield break;
}
if (message.Predecessor > receiveBuffer.Last
&& message.Predecessor >= ReceiveHorizon)
{
// this message is waiting for a non-delivered predecessor in the window, buffer it
if (receiveBuffer.Buffered == null)
{
receiveBuffer.Buffered = new SortedDictionary<DateTime, RequestMessage>();
}
receiveBuffer.Buffered[message.Timestamp] = message;
}
else
{
yield return message;
receiveBuffer.Last = message.Timestamp >= ReceiveHorizon ? message.Timestamp : DateTime.MinValue;
while (TryDeliverNextMessage(receiveBuffer, out var next))
{
yield return next;
}
}
}