in src/TriggersBinding/MySqlTableChangeMonitor.cs [552:627]
private async Task ReleaseLeasesAsync(MySqlConnection connection, CancellationToken token)
{
if (this._rowsToRelease.Count > 0)
{
string newLastPolledTime = this.RecomputeLastPolledTime();
bool retrySucceeded = false;
long unprocessedChangesCount = 0;
for (int retryCount = 1; retryCount <= MaxRetryReleaseLeases && !retrySucceeded; retryCount++)
{
using (MySqlTransaction transaction = connection.BeginTransaction(System.Data.IsolationLevel.RepeatableRead))
{
try
{
// Release the leases held on "_rowsToRelease".
using (MySqlCommand releaseLeasesCommand = this.BuildReleaseLeasesCommand(connection, transaction))
{
var commandSw = Stopwatch.StartNew();
this._logger.LogDebug($"Releasing lease ...");
int rowsUpdated = await releaseLeasesCommand.ExecuteNonQueryAsyncWithLogging(this._logger, token);
long releaseLeasesDurationMs = commandSw.ElapsedMilliseconds;
}
// count unprocessed changes where update is done before 'newLastPolledTime'.
using (MySqlCommand countUnprocessedChangesCommand = this.BuildCountUnprocessedChanges(connection, transaction, newLastPolledTime))
{
var commandSw = Stopwatch.StartNew();
unprocessedChangesCount = (long)await countUnprocessedChangesCommand.ExecuteScalarAsyncWithLogging(this._logger, token);
this._logger.LogDebug($"Unprocessed count is {unprocessedChangesCount} before {newLastPolledTime}");
}
if (unprocessedChangesCount == 0)
{
using (MySqlCommand updateLastPollingTimeCommand = this.BuildUpdateGlobalStateTableLastPollingTime(connection, transaction, newLastPolledTime))
{
int rowsUpdated = await updateLastPollingTimeCommand.ExecuteNonQueryAsyncWithLogging(this._logger, token);
this._logger.LogDebug($"Updated {GlobalStateTableLastPolledTimeColumnName} to " + newLastPolledTime);
}
using (MySqlCommand deleteProcessedChangesCommand = this.BuildDeleteProcessedChangesInLeaseTable(connection, transaction))
{
int rowsUpdated = await deleteProcessedChangesCommand.ExecuteNonQueryAsyncWithLogging(this._logger, token);
this._logger.LogDebug($"Total {rowsUpdated} rows cleaned from the Lease Table");
}
}
transaction.Commit();
retrySucceeded = true;
this._rowsToRelease = new List<IReadOnlyDictionary<string, object>>();
}
catch (Exception ex)
{
if (retryCount < MaxRetryReleaseLeases)
{
this._logger.LogError($"Failed to execute MySQL commands to release leases in attempt: {retryCount} for the specified table due to exception: {ex.GetType()}. Exception message: {ex.Message}");
}
else
{
this._logger.LogError($"Failed to release leases for the specified table after {MaxRetryReleaseLeases} attempts due to exception: {ex.GetType()}. Exception message: {ex.Message}");
}
try
{
transaction.Rollback();
}
catch (Exception ex2)
{
this._logger.LogError($"Failed to rollback transaction due to exception: {ex2.GetType()}. Exception message: {ex2.Message}");
}
}
}
}
}
await this.ClearRowsAsync(token);
}