in src/TriggersBinding/MySqlTableChangeMonitor.cs [244:324]
private async Task GetTableChangesAsync(MySqlConnection connection, CancellationToken token)
{
try
{
var transactionSw = Stopwatch.StartNew();
long getChangesDurationMs = 0L, acquireLeasesDurationMs = 0L;
using (MySqlTransaction transaction = connection.BeginTransaction())
{
try
{
var rows = new List<IReadOnlyDictionary<string, object>>();
// query for new changes.
using (MySqlCommand getChangesCommand = this.BuildGetChangesCommand(connection, transaction))
{
var commandSw = Stopwatch.StartNew();
this._logger.LogInformation($"Looking for latest changes on the configured table");
using (MySqlDataReader reader = getChangesCommand.ExecuteReaderWithLogging(this._logger))
{
while (reader.Read())
{
token.ThrowIfCancellationRequested();
rows.Add(MySqlBindingUtilities.BuildDictionaryFromMySqlRow(reader));
}
}
getChangesDurationMs = commandSw.ElapsedMilliseconds;
}
// If changes were found
if (rows.Count > 0)
{
this._logger.LogInformation($"The total no of rows found to process is {rows.Count}");
using (MySqlCommand acquireLeasesCommand = this.BuildAcquireLeasesCommand(connection, transaction, rows))
{
var commandSw = Stopwatch.StartNew();
this._logger.LogDebug($"Acquiring lease ...");
await acquireLeasesCommand.ExecuteNonQueryAsyncWithLogging(this._logger, token);
acquireLeasesDurationMs = commandSw.ElapsedMilliseconds;
}
}
transaction.Commit();
// Set the rows for processing, now since the leases are acquired.
await this._rowsLock.WaitAsync(token);
this._rowsToProcess = rows;
this._state = State.ProcessingChanges;
this._rowsLock.Release();
}
catch (Exception)
{
try
{
transaction.Rollback();
}
catch (Exception ex)
{
this._logger.LogError($"Failed to rollback transaction due to exception: {ex.GetType()}. Exception message: {ex.Message}");
}
throw;
}
}
}
catch (Exception e)
{
// If there's an exception in any part of the process, we want to clear all of our data in memory and
// retry checking for changes again.
await this._rowsLock.WaitAsync(token);
this._rowsToProcess = new List<IReadOnlyDictionary<string, object>>();
this._rowsLock.Release();
this._logger.LogError($"Failed to check for changes in the specified table due to exception: {e.GetType()}. Exception message: {e.Message}");
if (connection.IsBrokenOrClosed()) // TODO: e.IsFatalMySqlException() || - check mysql corresponding
{
// If we get a fatal MySQL Client exception or the connection is broken let it bubble up so we can try to re-establish the connection
throw;
}
}
}