in src/WebJobs.Extensions.DurableTask/EntityScheduler/MessageSorter.cs [51:120]
public void LabelOutgoingMessage(RequestMessage message, string destination, DateTime now, TimeSpan reorderWindow)
{
if (reorderWindow.Ticks == 0)
{
return; // we are not doing any message sorting.
}
DateTime timestamp = now;
bool countingEntries = false;
int destinationCount = 0;
if (this.SendHorizon + reorderWindow + MinIntervalBetweenCollections < now)
{
countingEntries = true;
this.SendHorizon = now - reorderWindow;
// clean out send clocks that are past the reorder window
if (this.LastSentToInstance != null)
{
List<string> expired = new List<string>();
foreach (var kvp in this.LastSentToInstance)
{
if (kvp.Value < this.SendHorizon)
{
expired.Add(kvp.Key);
}
else
{
destinationCount++;
}
}
foreach (var t in expired)
{
this.LastSentToInstance.Remove(t);
}
}
this.NumDestinations = destinationCount;
}
if (this.LastSentToInstance == null)
{
this.LastSentToInstance = new Dictionary<string, DateTime>(StringComparer.OrdinalIgnoreCase);
destinationCount++;
}
else if (this.LastSentToInstance.TryGetValue(destination, out var last))
{
message.Predecessor = last;
// ensure timestamps are monotonic even if system clock is not
if (timestamp <= last)
{
timestamp = new DateTime(last.Ticks + 1);
}
}
else
{
destinationCount++;
}
message.Timestamp = timestamp;
this.LastSentToInstance[destination] = timestamp;
this.NumDestinations = countingEntries ? destinationCount : (int?)null;
}