in src/TriggerBinding/SqlTableChangeMonitor.cs [517:598]
private async Task RenewLeasesAsync(SqlConnection connection, CancellationToken token)
{
await this._rowsToProcessLock.WaitAsync(token);
if (this._state == State.ProcessingChanges && this._rowsToProcess.Count > 0)
{
// Use a transaction to automatically release the app lock when we're done executing the query
using (SqlTransaction transaction = connection.BeginTransaction(IsolationLevel.RepeatableRead))
{
try
{
SqlCommand renewLeasesCommand = this.BuildRenewLeasesCommand(connection, transaction);
if (renewLeasesCommand != null)
{
using (renewLeasesCommand)
{
var stopwatch = Stopwatch.StartNew();
int rowsAffected = await renewLeasesCommand.ExecuteNonQueryAsyncWithLogging(this._logger, token, true);
long durationMs = stopwatch.ElapsedMilliseconds;
if (rowsAffected > 0)
{
this._logger.LogDebug($"Renewed leases for {rowsAffected} rows");
// Only send an event if we actually updated rows to reduce the overall number of events we send
var measures = new Dictionary<TelemetryMeasureName, double>
{
[TelemetryMeasureName.DurationMs] = durationMs,
};
TelemetryInstance.TrackEvent(TelemetryEventName.RenewLeases, this._telemetryProps, measures);
}
transaction.Commit();
}
}
}
catch (Exception e)
{
// This catch block is necessary so that the finally block is executed even in the case of an exception
// (see https://docs.microsoft.com/dotnet/csharp/language-reference/keywords/try-finally, third
// paragraph). If we fail to renew the leases, multiple workers could be processing the same change
// data, but we have functionality in place to deal with this (see design doc).
this._logger.LogError($"Failed to renew leases due to exception: {e.GetType()}. Exception message: {e.Message}");
TelemetryInstance.TrackException(TelemetryErrorName.RenewLeases, e, this._telemetryProps);
try
{
transaction.Rollback();
}
catch (Exception e2)
{
this._logger.LogError($"RenewLeases - Failed to rollback transaction due to exception: {e2.GetType()}. Exception message: {e2.Message}");
TelemetryInstance.TrackException(TelemetryErrorName.RenewLeasesRollback, e2, this._telemetryProps);
}
}
finally
{
// Do we want to update this count even in the case of a failure to renew the leases? Probably,
// because the count is simply meant to indicate how much time the other thread has spent processing
// changes essentially.
this._leaseRenewalCount += 1;
// If this thread has been cancelled, then the _cancellationTokenSourceExecutor could have already
// been disposed so shouldn't cancel it.
if (this._leaseRenewalCount == MaxLeaseRenewalCount && !token.IsCancellationRequested)
{
this._logger.LogWarning("Call to execute the function (TryExecuteAsync) seems to be stuck, so it is being cancelled");
// If we keep renewing the leases, the thread responsible for processing the changes is stuck.
// If it's stuck, it has to be stuck in the function execution call (I think), so we should
// cancel the call.
this._cancellationTokenSourceExecutor.Cancel();
this._cancellationTokenSourceExecutor = new CancellationTokenSource();
}
}
}
}
// Want to always release the lock at the end, even if renewing the leases failed.
this._rowsToProcessLock.Release();
}