in src/TriggerBinding/SqlTableChangeMonitor.cs [277:389]
private async Task GetTableChangesAsync(SqlConnection connection, CancellationToken token)
{
try
{
var transactionSw = Stopwatch.StartNew();
long setLastSyncVersionDurationMs = 0L, getChangesDurationMs = 0L, acquireLeasesDurationMs = 0L;
using (SqlTransaction transaction = connection.BeginTransaction(IsolationLevel.RepeatableRead))
{
try
{
// Update the version number stored in the global state table if necessary before using it.
using (SqlCommand updateTablesPreInvocationCommand = this.BuildUpdateTablesPreInvocation(connection, transaction))
{
var commandSw = Stopwatch.StartNew();
object result = await updateTablesPreInvocationCommand.ExecuteScalarAsyncWithLogging(this._logger, token, true);
if (result != null)
{
// If we updated the LastSyncVersion we'll get a message back from the query, so log it here
this._logger.LogDebug($"[PreInvocation] {result}");
}
setLastSyncVersionDurationMs = commandSw.ElapsedMilliseconds;
}
var rows = new List<IReadOnlyDictionary<string, object>>();
// Use the version number to query for new changes.
using (SqlCommand getChangesCommand = this.BuildGetChangesCommand(connection, transaction))
{
var commandSw = Stopwatch.StartNew();
using (SqlDataReader reader = getChangesCommand.ExecuteReader())
{
while (reader.Read())
{
token.ThrowIfCancellationRequested();
rows.Add(SqlBindingUtilities.BuildDictionaryFromSqlRow(reader));
}
}
getChangesDurationMs = commandSw.ElapsedMilliseconds;
}
// Also get the number of rows that currently have lease locks on them
// or are skipped because they have reached their max attempt count.
// This can help with supportability by allowing a customer to see when a
// trigger was processed successfully but returned fewer rows than expected.
string leaseLockedOrMaxAttemptRowCountMessage = await this.GetLeaseLockedOrMaxAttemptRowCountMessage(connection, transaction, token);
if (rows.Count > 0 || leaseLockedOrMaxAttemptRowCountMessage != null)
{
this._logger.LogDebug($"Executed GetChangesCommand in GetTableChangesAsync. {rows.Count} available changed rows. {leaseLockedOrMaxAttemptRowCountMessage}");
}
// If changes were found, acquire leases on them.
if (rows.Count > 0)
{
using (SqlCommand acquireLeasesCommand = this.BuildAcquireLeasesCommand(connection, transaction, rows))
{
var commandSw = Stopwatch.StartNew();
await acquireLeasesCommand.ExecuteNonQueryAsyncWithLogging(this._logger, token);
acquireLeasesDurationMs = commandSw.ElapsedMilliseconds;
}
// Only send event if we got changes to reduce the overall number of events sent since we generally
// only care about the times that we had to actually retrieve and process rows
var measures = new Dictionary<TelemetryMeasureName, double>
{
[TelemetryMeasureName.SetLastSyncVersionDurationMs] = setLastSyncVersionDurationMs,
[TelemetryMeasureName.GetChangesDurationMs] = getChangesDurationMs,
[TelemetryMeasureName.AcquireLeasesDurationMs] = acquireLeasesDurationMs,
[TelemetryMeasureName.TransactionDurationMs] = transactionSw.ElapsedMilliseconds,
[TelemetryMeasureName.BatchCount] = this._rowsToProcess.Count,
};
TelemetryInstance.TrackEvent(TelemetryEventName.GetChanges, this._telemetryProps, measures);
}
transaction.Commit();
// Set the rows for processing, now since the leases are acquired.
await this._rowsToProcessLock.WaitAsync(token);
this._rowsToProcess = rows;
this._state = State.ProcessingChanges;
this._rowsToProcessLock.Release();
}
catch (Exception)
{
try
{
transaction.Rollback();
}
catch (Exception ex)
{
this._logger.LogError($"Failed to rollback transaction due to exception: {ex.GetType()}. Exception message: {ex.Message}");
TelemetryInstance.TrackException(TelemetryErrorName.GetChangesRollback, ex, this._telemetryProps);
}
throw;
}
}
}
catch (Exception e)
{
// If there's an exception in any part of the process, we want to clear all of our data in memory and
// retry checking for changes again.
await this._rowsToProcessLock.WaitAsync(token);
this._rowsToProcess = new List<IReadOnlyDictionary<string, object>>();
this._rowsToProcessLock.Release();
this._logger.LogError($"Failed to check for changes in table '{this._userTable.FullName}' due to exception: {e.GetType()}. Exception message: {e.Message}");
TelemetryInstance.TrackException(TelemetryErrorName.GetChanges, e, this._telemetryProps);
if (e.IsFatalSqlException() || connection.IsBrokenOrClosed())
{
// If we get a fatal SQL Client exception or the connection is broken let it bubble up so we can try to re-establish the connection
throw;
}
}
}