in src/TriggerBinding/SqlTriggerMetricsProvider.cs [47:99]
private async Task<long> GetUnprocessedChangeCountAsync()
{
long unprocessedChangeCount = 0L;
long getUnprocessedChangesDurationMs = 0L;
try
{
using (var connection = new SqlConnection(this._connectionString))
{
await connection.OpenAsync();
int userTableId = await GetUserTableIdAsync(connection, this._userTable, this._logger, CancellationToken.None);
IReadOnlyList<(string name, string type)> primaryKeyColumns = GetPrimaryKeyColumnsAsync(connection, userTableId, this._logger, this._userTable.FullName, CancellationToken.None);
// Use a transaction to automatically release the app lock when we're done executing the query
using (SqlTransaction transaction = connection.BeginTransaction(IsolationLevel.RepeatableRead))
{
try
{
using (SqlCommand getUnprocessedChangesCommand = this.BuildGetUnprocessedChangesCommand(connection, transaction, primaryKeyColumns, userTableId))
{
var commandSw = Stopwatch.StartNew();
unprocessedChangeCount = (long)await getUnprocessedChangesCommand.ExecuteScalarAsyncWithLogging(this._logger, CancellationToken.None, true);
getUnprocessedChangesDurationMs = commandSw.ElapsedMilliseconds;
}
transaction.Commit();
}
catch (Exception)
{
try
{
transaction.Rollback();
}
catch (Exception ex2)
{
this._logger.LogError($"GetUnprocessedChangeCount : Failed to rollback transaction due to exception: {ex2.GetType()}. Exception message: {ex2.Message}");
TelemetryInstance.TrackException(TelemetryErrorName.GetUnprocessedChangeCountRollback, ex2);
}
throw;
}
}
}
}
catch (Exception ex)
{
this._logger.LogError($"Failed to query count of unprocessed changes for table '{this._userTable.FullName}' due to exception: {ex.GetType()}. Exception message: {ex.Message}");
TelemetryInstance.TrackException(TelemetryErrorName.GetUnprocessedChangeCount, ex, null, new Dictionary<TelemetryMeasureName, double>() { { TelemetryMeasureName.GetUnprocessedChangesDurationMs, getUnprocessedChangesDurationMs } });
throw;
}
return unprocessedChangeCount;
}