in src/TriggerBinding/SqlTableChangeMonitor.cs [191:271]
private async Task RunChangeConsumptionLoopAsync()
{
this._logger.LogDebug($"Starting change consumption loop. MaxBatchSize: {this._maxBatchSize} PollingIntervalMs: {this._pollingIntervalInMs}");
try
{
CancellationToken token = this._cancellationTokenSourceCheckForChanges.Token;
using (var connection = new SqlConnection(this._connectionString))
{
await connection.OpenAsync(token);
bool forceReconnect = false;
// Check for cancellation request only after a cycle of checking and processing of changes completes.
while (!token.IsCancellationRequested)
{
bool isConnected = await connection.TryEnsureConnected(forceReconnect, this._logger, "ChangeConsumptionConnection", token);
if (!isConnected)
{
// If we couldn't reconnect then wait our delay and try again
await Task.Delay(TimeSpan.FromMilliseconds(this._pollingIntervalInMs), token);
continue;
}
else
{
forceReconnect = false;
}
try
{
// Process states sequentially since we normally expect the state to transition at the end
// of each previous state - but if an unexpected error occurs we'll skip the rest and then
// retry that state after the delay
if (this._state == State.CheckingForChanges)
{
await this.GetTableChangesAsync(connection, token);
}
if (this._state == State.ProcessingChanges)
{
await this.ProcessTableChangesAsync(token);
}
if (this._state == State.Cleanup)
{
await this.ReleaseLeasesAsync(connection, token);
}
}
catch (Exception e) when (e.IsFatalSqlException() || connection.IsBrokenOrClosed())
{
// Retry connection if there was a fatal SQL exception or something else caused the connection to be closed
// since that indicates some other issue occurred (such as dropped network) and may be able to be recovered
this._logger.LogError($"Fatal SQL Client exception processing changes. Will attempt to reestablish connection in {this._pollingIntervalInMs}ms. Exception = {e.Message}");
forceReconnect = true;
}
catch (Exception e) when (e.IsDeadlockException())
{
// Deadlocks aren't fatal and don't need a reconnection so just let the loop try again after the normal delay
}
await Task.Delay(TimeSpan.FromMilliseconds(this._pollingIntervalInMs), token);
}
}
}
catch (Exception e)
{
// Only want to log the exception if it wasn't caused by StopAsync being called, since Task.Delay
// throws an exception if it's cancelled.
if (e.GetType() != typeof(TaskCanceledException))
{
this._logger.LogError($"Exiting change consumption loop due to exception: {e.GetType()}. Exception message: {e.Message}");
TelemetryInstance.TrackException(TelemetryErrorName.ConsumeChangesLoop, e, this._telemetryProps);
}
throw;
}
finally
{
// If this thread exits due to any reason, then the lease renewal thread should exit as well. Otherwise,
// it will keep looping perpetually.
this._cancellationTokenSourceRenewLeases.Cancel();
this._cancellationTokenSourceCheckForChanges.Dispose();
this._cancellationTokenSourceExecutor.Dispose();
}
}