in src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs [70:139]
private ScaleStatus GetScaleStatusCore(int workerCount, DurableTaskTriggerMetrics[] metrics)
{
var scaleStatus = new ScaleStatus() { Vote = ScaleVote.None };
if (metrics == null)
{
return scaleStatus;
}
var heartbeats = new PerformanceHeartbeat[metrics.Length];
for (int i = 0; i < metrics.Length; ++i)
{
TimeSpan workItemQueueLatency;
bool parseResult = TimeSpan.TryParse(metrics[i].WorkItemQueueLatency, out workItemQueueLatency);
heartbeats[i] = new PerformanceHeartbeat()
{
PartitionCount = metrics[i].PartitionCount,
WorkItemQueueLatency = parseResult ? workItemQueueLatency : TimeSpan.FromMilliseconds(0),
WorkItemQueueLength = metrics[i].WorkItemQueueLength,
};
if (metrics[i].ControlQueueLengths == null)
{
heartbeats[i].ControlQueueLengths = new List<int>();
}
else
{
heartbeats[i].ControlQueueLengths = JsonConvert.DeserializeObject<IReadOnlyList<int>>(metrics[i].ControlQueueLengths);
}
if (metrics[i].ControlQueueLatencies == null)
{
heartbeats[i].ControlQueueLatencies = new List<TimeSpan>();
}
else
{
heartbeats[i].ControlQueueLatencies = JsonConvert.DeserializeObject<IReadOnlyList<TimeSpan>>(metrics[i].ControlQueueLatencies);
}
}
DisconnectedPerformanceMonitor performanceMonitor = this.durableTaskMetricsProvider.GetPerformanceMonitor();
var scaleRecommendation = performanceMonitor.MakeScaleRecommendation(workerCount, heartbeats.ToArray());
bool writeToUserLogs = false;
switch (scaleRecommendation?.Action)
{
case ScaleAction.AddWorker:
scaleStatus.Vote = ScaleVote.ScaleOut;
writeToUserLogs = true;
break;
case ScaleAction.RemoveWorker:
scaleStatus.Vote = ScaleVote.ScaleIn;
writeToUserLogs = true;
break;
default:
scaleStatus.Vote = ScaleVote.None;
break;
}
if (writeToUserLogs)
{
this.logger.LogInformation(
"Durable Functions Trigger Scale Decision for {TaskHub}: {Vote}, Reason: {Reason}",
this.hubName,
scaleStatus.Vote,
scaleRecommendation?.Reason);
}
return scaleStatus;
}