in BASE/src/ServerTelemetryChannel/Implementation/SamplingPercentageEstimatorTelemetryProcessor.cs [20:274]
public delegate void AdaptiveSamplingPercentageEvaluatedCallback(
double afterSamplingTelemetryItemRatePerSecond,
double currentSamplingPercentage,
double newSamplingPercentage,
bool isSamplingPercentageChanged,
Channel.Implementation.SamplingPercentageEstimatorSettings settings);
/// <summary>
/// Telemetry processor to estimate ideal sampling percentage.
/// </summary>
internal class SamplingPercentageEstimatorTelemetryProcessor : ITelemetryProcessor, IDisposable
{
/// <summary>
/// Next-in-chain processor.
/// </summary>
private ITelemetryProcessor next;
/// <summary>
/// Dynamic sampling estimator settings.
/// </summary>
private Channel.Implementation.SamplingPercentageEstimatorSettings settings;
/// <summary>
/// Average telemetry item counter.
/// </summary>
private ExponentialMovingAverageCounter itemCount;
/// <summary>
/// Average proactively SampledIn telemetry item counter.
/// </summary>
private ExponentialMovingAverageCounter proactivelySampledInCount;
/// <summary>
/// Evaluation timer.
/// </summary>
private Timer evaluationTimer;
/// <summary>
/// Current evaluation interval.
/// </summary>
private TimeSpan evaluationInterval;
/// <summary>
/// Last date and time sampling percentage was changed.
/// </summary>
private DateTimeOffset samplingPercentageLastChangeDateTime;
/// <summary>
/// Callback to invoke every time sampling percentage is evaluated.
/// </summary>
private Channel.Implementation.AdaptiveSamplingPercentageEvaluatedCallback evaluationCallback;
/// <summary>
/// Initializes a new instance of the <see cref="SamplingPercentageEstimatorTelemetryProcessor"/> class.
/// <param name="next">Next TelemetryProcessor in call chain.</param>
/// </summary>
public SamplingPercentageEstimatorTelemetryProcessor(ITelemetryProcessor next)
: this(new Channel.Implementation.SamplingPercentageEstimatorSettings(), null, next)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="SamplingPercentageEstimatorTelemetryProcessor"/> class.
/// <param name="settings">Dynamic sampling estimator settings.</param>
/// <param name="callback">Callback to invoke every time sampling percentage is evaluated.</param>
/// <param name="next">Next TelemetryProcessor in call chain.</param>
/// </summary>
public SamplingPercentageEstimatorTelemetryProcessor(
Channel.Implementation.SamplingPercentageEstimatorSettings settings,
Channel.Implementation.AdaptiveSamplingPercentageEvaluatedCallback callback,
ITelemetryProcessor next)
{
this.evaluationCallback = callback;
this.settings = settings ?? throw new ArgumentNullException(nameof(settings));
this.next = next ?? throw new ArgumentNullException(nameof(next));
this.CurrentSamplingRate = settings.EffectiveInitialSamplingRate;
this.CurrentProactiveSamplingRate = settings.EffectiveInitialSamplingRate;
this.itemCount = new ExponentialMovingAverageCounter(settings.EffectiveMovingAverageRatio);
this.proactivelySampledInCount = new ExponentialMovingAverageCounter(settings.EffectiveMovingAverageRatio);
this.samplingPercentageLastChangeDateTime = PreciseTimestamp.GetUtcNow();
// set evaluation interval to default value if it is negative or zero
this.evaluationInterval = this.settings.EffectiveEvaluationInterval;
// set up timer to run math to estimate sampling percentage
this.evaluationTimer = new Timer(
this.EstimateSamplingPercentage,
null,
this.evaluationInterval,
this.evaluationInterval);
}
/// <summary>
/// Gets or sets current sampling rate.
/// </summary>
internal int CurrentSamplingRate { get; set; }
/// <summary>
/// Gets current proactive sampling rate sampling rate.
/// </summary>
internal double CurrentProactiveSamplingRate { get; private set; }
/// <summary>
/// Processes telemetry item.
/// </summary>
/// <param name="item">Telemetry item to process.</param>
public void Process(ITelemetry item)
{
// increment post-sampling telemetry item counter
this.itemCount.Increment();
if (item is ISupportAdvancedSampling advancedSamplingItem &&
advancedSamplingItem.ProactiveSamplingDecision == SamplingDecision.SampledIn)
{
this.proactivelySampledInCount.Increment();
}
// continue processing telemetry item with the next telemetry processor
this.next.Process(item);
}
/// <summary>
/// Disposes the object.
/// </summary>
public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// Checks to see if exponential moving average has changed.
/// </summary>
/// <param name="running">Currently running value of moving average.</param>
/// <param name="current">Value set in the algorithm parameters.</param>
/// <returns>True if moving average value changed.</returns>
private static bool MovingAverageCoefficientChanged(double running, double current)
{
const double Precision = 1E-12;
return (running < current - Precision) || (running > current + Precision);
}
private void Dispose(bool disposing)
{
if (disposing && this.evaluationTimer != null)
{
this.evaluationTimer.Dispose();
this.evaluationTimer = null;
}
}
/// <summary>
/// Callback for sampling percentage evaluation timer.
/// </summary>
/// <param name="state">Timer state.</param>
private void EstimateSamplingPercentage(object state)
{
// get observed after-sampling eps
double observedEps = this.itemCount.StartNewInterval() / this.evaluationInterval.TotalSeconds;
// get observed after-sampling eps
double observedProactiveEps = this.proactivelySampledInCount.StartNewInterval() / this.evaluationInterval.TotalSeconds;
// we see events post sampling, so get pre-sampling eps
double beforeSamplingEps = observedEps * this.CurrentSamplingRate;
// we see events post sampling, so get pre-sampling eps
double beforeProactiveSamplingEps = observedProactiveEps * this.CurrentProactiveSamplingRate;
// calculate suggested sampling rate
int suggestedSamplingRate = (int)Math.Ceiling(beforeSamplingEps / this.settings.EffectiveMaxTelemetryItemsPerSecond);
// adjust suggested rate so that it fits between min and max configured
if (suggestedSamplingRate > this.settings.EffectiveMaxSamplingRate)
{
suggestedSamplingRate = this.settings.EffectiveMaxSamplingRate;
}
if (suggestedSamplingRate < this.settings.EffectiveMinSamplingRate)
{
suggestedSamplingRate = this.settings.EffectiveMinSamplingRate;
}
double suggestedProactiveSamplingRate = beforeProactiveSamplingEps / this.settings.EffectiveMaxTelemetryItemsPerSecond;
if (suggestedProactiveSamplingRate < this.settings.EffectiveMinSamplingRate)
{
suggestedProactiveSamplingRate = this.settings.EffectiveMinSamplingRate;
}
// see if evaluation interval was changed and apply change
if (this.evaluationInterval != this.settings.EffectiveEvaluationInterval)
{
this.evaluationInterval = this.settings.EffectiveEvaluationInterval;
this.evaluationTimer.Change(this.evaluationInterval, this.evaluationInterval);
}
// check to see if sampling rate needs changes
bool samplingPercentageChangeNeeded = suggestedSamplingRate != this.CurrentSamplingRate;
if (samplingPercentageChangeNeeded)
{
// check to see if enough time passed since last sampling % change
if ((PreciseTimestamp.GetUtcNow() - this.samplingPercentageLastChangeDateTime) <
(suggestedSamplingRate > this.CurrentSamplingRate
? this.settings.EffectiveSamplingPercentageDecreaseTimeout
: this.settings.EffectiveSamplingPercentageIncreaseTimeout))
{
samplingPercentageChangeNeeded = false;
}
}
// call evaluation callback if provided
if (this.evaluationCallback != null)
{
// we do not want to crash timer thread knocking out the process
// in case customer-provided callback failed
try
{
this.evaluationCallback(
observedEps,
100.0 / this.CurrentSamplingRate,
100.0 / suggestedSamplingRate,
samplingPercentageChangeNeeded,
this.settings);
}
catch (Exception exp)
{
TelemetryChannelEventSource.Log.SamplingCallbackError(exp.ToString());
}
}
if (samplingPercentageChangeNeeded)
{
// apply sampling percentage change
this.samplingPercentageLastChangeDateTime = PreciseTimestamp.GetUtcNow();
this.CurrentSamplingRate = suggestedSamplingRate;
this.CurrentProactiveSamplingRate = suggestedProactiveSamplingRate;
}
if (samplingPercentageChangeNeeded ||
MovingAverageCoefficientChanged(this.itemCount.Coefficient, this.settings.EffectiveMovingAverageRatio))
{
// since we're observing event count post sampling and we're about
// to change sampling rate or change coefficient, reset counter
this.itemCount = new ExponentialMovingAverageCounter(this.settings.EffectiveMovingAverageRatio);
this.proactivelySampledInCount =
new ExponentialMovingAverageCounter(this.settings.EffectiveMovingAverageRatio);
}
}
}