in src/WebJobs.Extensions.DurableTask/EntityScheduler/MessageSorter.cs [125:267]
public IEnumerable<RequestMessage> ReceiveInOrder(RequestMessage message, TimeSpan reorderWindow)
{
// messages sent from clients and forwarded lock messages are not participating in the sorting.
if (reorderWindow.Ticks == 0 || message.ParentInstanceId == null || message.Position > 0)
{
// Just pass the message through.
yield return message;
yield break;
}
bool countingEntries = false;
int sourceCount = 0;
int messageCount = 0;
// advance the horizon based on the latest timestamp
if (this.ReceiveHorizon + reorderWindow + MinIntervalBetweenCollections < message.Timestamp)
{
countingEntries = true;
this.ReceiveHorizon = message.Timestamp - reorderWindow;
// deliver any messages that were held in the receive buffers
// but are now past the reorder window
List<string> emptyReceiveBuffers = new List<string>();
if (this.ReceivedFromInstance != null)
{
foreach (var kvp in this.ReceivedFromInstance)
{
if (kvp.Value.Last < this.ReceiveHorizon)
{
kvp.Value.Last = DateTime.MinValue;
}
while (this.TryDeliverNextMessage(kvp.Value, out var next))
{
yield return next;
}
if (kvp.Value.Last == DateTime.MinValue
&& (kvp.Value.Buffered == null || kvp.Value.Buffered.Count == 0))
{
emptyReceiveBuffers.Add(kvp.Key);
}
else
{
sourceCount++;
messageCount += kvp.Value.Buffered?.Count() ?? 0;
}
}
foreach (var t in emptyReceiveBuffers)
{
this.ReceivedFromInstance.Remove(t);
}
if (this.ReceivedFromInstance.Count == 0)
{
this.ReceivedFromInstance = null;
}
}
}
// Messages older than the reorder window are not participating.
if (message.Timestamp < this.ReceiveHorizon)
{
// Just pass the message through.
yield return message;
this.NumSources = countingEntries ? sourceCount : (int?)null;
this.NumMessages = countingEntries ? messageCount : (int?)null;
yield break;
}
ReceiveBuffer receiveBuffer;
if (this.ReceivedFromInstance == null)
{
this.ReceivedFromInstance = new Dictionary<string, ReceiveBuffer>(StringComparer.OrdinalIgnoreCase);
}
if (!this.ReceivedFromInstance.TryGetValue(message.ParentInstanceId, out receiveBuffer))
{
this.ReceivedFromInstance[message.ParentInstanceId] = receiveBuffer = new ReceiveBuffer()
{
ExecutionId = message.ParentExecutionId,
};
sourceCount++;
}
else if (receiveBuffer.ExecutionId != message.ParentExecutionId)
{
// this message is from a new execution; release all buffered messages and start over
if (receiveBuffer.Buffered != null)
{
foreach (var kvp in receiveBuffer.Buffered)
{
yield return kvp.Value;
messageCount--;
}
receiveBuffer.Buffered.Clear();
}
receiveBuffer.Last = DateTime.MinValue;
receiveBuffer.ExecutionId = message.ParentExecutionId;
}
if (message.Timestamp <= receiveBuffer.Last)
{
// This message was already delivered, it's a duplicate
yield break;
}
if (message.Predecessor > receiveBuffer.Last
&& message.Predecessor >= this.ReceiveHorizon)
{
// this message is waiting for a non-delivered predecessor in the window, buffer it
if (receiveBuffer.Buffered == null)
{
receiveBuffer.Buffered = new SortedDictionary<DateTime, RequestMessage>();
}
receiveBuffer.Buffered[message.Timestamp] = message;
messageCount++;
}
else
{
yield return message;
receiveBuffer.Last = message.Timestamp >= this.ReceiveHorizon ? message.Timestamp : DateTime.MinValue;
while (this.TryDeliverNextMessage(receiveBuffer, out var next))
{
yield return next;
messageCount--;
}
}
this.NumSources = countingEntries ? sourceCount : (int?)null;
this.NumMessages = countingEntries ? messageCount : (int?)null;
}