in src/TriggersBinding/MySqlTriggerMetricsProvider.cs [45:95]
private async Task<long> GetUnprocessedChangeCountAsync()
{
long unprocessedChangeCount = 0L;
// long getUnprocessedChangesDurationMs = 0L;
try
{
using (var connection = new MySqlConnection(this._connectionString))
{
await connection.OpenAsync();
string userTableId = await GetUserTableIdAsync(connection, this._userTable, this._logger, CancellationToken.None);
IReadOnlyList<(string name, string type)> primaryKeyColumns = GetPrimaryKeyColumnsAsync(connection, this._userTable.FullName, this._logger, CancellationToken.None);
// Use a transaction to automatically release the app lock when we're done executing the query
using (MySqlTransaction transaction = connection.BeginTransaction(IsolationLevel.RepeatableRead))
{
try
{
using (MySqlCommand getUnprocessedChangesCommand = this.BuildGetUnprocessedChangesCommand(connection, transaction, primaryKeyColumns, userTableId))
{
var commandSw = Stopwatch.StartNew();
unprocessedChangeCount = (long)await getUnprocessedChangesCommand.ExecuteScalarAsyncWithLogging(this._logger, CancellationToken.None, true);
this._logger.LogDebug($"The unprocessed changes count is {unprocessedChangeCount}");
}
transaction.Commit();
}
catch (Exception)
{
try
{
transaction.Rollback();
}
catch (Exception ex2)
{
this._logger.LogError($"GetUnprocessedChangeCount : Failed to rollback transaction due to exception: {ex2.GetType()}. Exception message: {ex2.Message}");
}
throw;
}
}
}
}
catch (Exception ex)
{
this._logger.LogError($"Failed to query count of unprocessed changes for the specified table' due to exception: {ex.GetType()}. Exception message: {ex.Message}");
throw;
}
return unprocessedChangeCount;
}