in src/TriggersBinding/MySqlTriggerScaleMonitor.cs [86:153]
private ScaleStatus GetScaleStatusCore(int workerCount, MySqlTriggerMetrics[] metrics)
{
// We require minimum 5 samples to estimate the trend of variation in count of unprocessed changes with
// certain reliability. These samples roughly cover the timespan of past 40 seconds.
const int minSamplesForScaling = 5;
var status = new ScaleStatus
{
Vote = ScaleVote.None,
};
// Do not make a scale decision unless we have enough samples.
if (metrics is null || (metrics.Length < minSamplesForScaling))
{
return status;
}
// Consider only the most recent batch of samples in the rest of the method.
metrics = metrics.TakeLast(minSamplesForScaling).ToArray();
string counts = string.Join(", ", metrics.Select(metric => metric.UnprocessedChangeCount));
// Add worker if the count of unprocessed changes per worker exceeds the maximum limit.
long lastUnprocessedChangeCount = metrics.Last().UnprocessedChangeCount;
if (lastUnprocessedChangeCount > workerCount * this._maxChangesPerWorker)
{
status.Vote = ScaleVote.ScaleOut;
this._logger.LogDebug($"Requesting scale-out: Found too many unprocessed changes: {lastUnprocessedChangeCount} for the specified table relative to the number of workers.");
return status;
}
// Check if there is a continuous increase or decrease in count of unprocessed changes.
bool isIncreasing = true;
bool isDecreasing = true;
for (int index = 0; index < metrics.Length - 1; index++)
{
isIncreasing = isIncreasing && metrics[index].UnprocessedChangeCount < metrics[index + 1].UnprocessedChangeCount;
isDecreasing = isDecreasing && (metrics[index + 1].UnprocessedChangeCount == 0 || metrics[index].UnprocessedChangeCount > metrics[index + 1].UnprocessedChangeCount);
}
if (isIncreasing)
{
// Scale out only if the expected count of unprocessed changes would exceed the combined limit after 30 seconds.
DateTime referenceTime = metrics[metrics.Length - 1].Timestamp - TimeSpan.FromSeconds(30);
MySqlTriggerMetrics referenceMetric = metrics.First(metric => metric.Timestamp > referenceTime);
long expectedUnprocessedChangeCount = (2 * metrics[metrics.Length - 1].UnprocessedChangeCount) - referenceMetric.UnprocessedChangeCount;
if (expectedUnprocessedChangeCount > workerCount * this._maxChangesPerWorker)
{
status.Vote = ScaleVote.ScaleOut;
this._logger.LogDebug($"Requesting scale-out: Found the unprocessed changes for the specified table to be continuously increasing" +
" and may exceed the maximum limit set for the workers.");
return status;
}
}
if (isDecreasing)
{
// Scale in only if the count of unprocessed changes will not exceed the combined limit post the scale-in operation.
if (lastUnprocessedChangeCount <= (workerCount - 1) * this._maxChangesPerWorker)
{
status.Vote = ScaleVote.ScaleIn;
this._logger.LogDebug($"Requesting scale-in: The specified table is found to be either idle or the unprocessed changes to be continuously decreasing.");
return status;
}
}
return status;
}