in src/TriggersBinding/MySqlTableChangeMonitor.cs [440:509]
private async Task RenewLeasesAsync(MySqlConnection connection, CancellationToken token)
{
await this._rowsLock.WaitAsync(token);
if (this._state == State.ProcessingChanges && this._rowsToProcess.Count > 0)
{
// Use a transaction to automatically release the app lock when we're done executing the query
using (MySqlTransaction transaction = connection.BeginTransaction())
{
try
{
using (MySqlCommand renewLeasesCommand = this.BuildRenewLeasesCommand(connection, transaction))
{
var stopwatch = Stopwatch.StartNew();
this._logger.LogDebug($"Renewing lease ...");
int rowsAffected = await renewLeasesCommand.ExecuteNonQueryAsyncWithLogging(this._logger, token);
long durationMs = stopwatch.ElapsedMilliseconds;
if (rowsAffected > 0)
{
// Only send an event if we actually updated rows to reduce the overall number of events we send
this._logger.LogInformation($"Updated the Leases table");
}
transaction.Commit();
this._logger.LogDebug($"The lease expiration time renewed by {LeaseRenewalIntervalInSeconds} seconds, for the rows(under process) in the lease table");
}
}
catch (Exception e)
{
// This catch block is necessary so that the finally block is executed even in the case of an exception
// (see https://docs.microsoft.com/dotnet/csharp/language-reference/keywords/try-finally, third
// paragraph). If we fail to renew the leases, multiple workers could be processing the same change
// data, but we have functionality in place to deal with this (see design doc).
this._logger.LogError($"Failed to renew leases due to exception: {e.GetType()}. Exception message: {e.Message}");
try
{
transaction.Rollback();
}
catch (Exception e2)
{
this._logger.LogError($"RenewLeases - Failed to rollback transaction due to exception: {e2.GetType()}. Exception message: {e2.Message}");
}
}
finally
{
// Do we want to update this count even in the case of a failure to renew the leases? Probably,
// because the count is simply meant to indicate how much time the other thread has spent processing
// changes essentially.
this._leaseRenewalCount += 1;
// If this thread has been cancelled, then the _cancellationTokenSourceExecutor could have already
// been disposed so shouldn't cancel it.
if (this._leaseRenewalCount == MaxLeaseRenewalCount && !token.IsCancellationRequested)
{
this._logger.LogWarning("Call to execute the function (TryExecuteAsync) seems to be stuck, so it is being cancelled");
// If we keep renewing the leases, the thread responsible for processing the changes is stuck.
// If it's stuck, it has to be stuck in the function execution call (I think), so we should
// cancel the call.
this._cancellationTokenSourceExecutor.Cancel();
//this._cancellationTokenSourceExecutor = new CancellationTokenSource();
}
}
}
}
// Want to always release the lock at the end, even if renewing the leases failed.
this._rowsLock.Release();
}