private async Task ReleaseLeasesAsync()

in src/TriggersBinding/MySqlTableChangeMonitor.cs [552:627]


        private async Task ReleaseLeasesAsync(MySqlConnection connection, CancellationToken token)
        {
            if (this._rowsToRelease.Count > 0)
            {
                string newLastPolledTime = this.RecomputeLastPolledTime();
                bool retrySucceeded = false;
                long unprocessedChangesCount = 0;

                for (int retryCount = 1; retryCount <= MaxRetryReleaseLeases && !retrySucceeded; retryCount++)
                {
                    using (MySqlTransaction transaction = connection.BeginTransaction(System.Data.IsolationLevel.RepeatableRead))
                    {
                        try
                        {
                            // Release the leases held on "_rowsToRelease".
                            using (MySqlCommand releaseLeasesCommand = this.BuildReleaseLeasesCommand(connection, transaction))
                            {
                                var commandSw = Stopwatch.StartNew();
                                this._logger.LogDebug($"Releasing lease ...");
                                int rowsUpdated = await releaseLeasesCommand.ExecuteNonQueryAsyncWithLogging(this._logger, token);
                                long releaseLeasesDurationMs = commandSw.ElapsedMilliseconds;
                            }

                            // count unprocessed changes where update is done before 'newLastPolledTime'.
                            using (MySqlCommand countUnprocessedChangesCommand = this.BuildCountUnprocessedChanges(connection, transaction, newLastPolledTime))
                            {
                                var commandSw = Stopwatch.StartNew();
                                unprocessedChangesCount = (long)await countUnprocessedChangesCommand.ExecuteScalarAsyncWithLogging(this._logger, token);
                                this._logger.LogDebug($"Unprocessed count is {unprocessedChangesCount} before {newLastPolledTime}");
                            }

                            if (unprocessedChangesCount == 0)
                            {
                                using (MySqlCommand updateLastPollingTimeCommand = this.BuildUpdateGlobalStateTableLastPollingTime(connection, transaction, newLastPolledTime))
                                {
                                    int rowsUpdated = await updateLastPollingTimeCommand.ExecuteNonQueryAsyncWithLogging(this._logger, token);
                                    this._logger.LogDebug($"Updated {GlobalStateTableLastPolledTimeColumnName} to " + newLastPolledTime);
                                }

                                using (MySqlCommand deleteProcessedChangesCommand = this.BuildDeleteProcessedChangesInLeaseTable(connection, transaction))
                                {
                                    int rowsUpdated = await deleteProcessedChangesCommand.ExecuteNonQueryAsyncWithLogging(this._logger, token);
                                    this._logger.LogDebug($"Total {rowsUpdated} rows cleaned from the Lease Table");
                                }
                            }

                            transaction.Commit();

                            retrySucceeded = true;
                            this._rowsToRelease = new List<IReadOnlyDictionary<string, object>>();
                        }
                        catch (Exception ex)
                        {
                            if (retryCount < MaxRetryReleaseLeases)
                            {
                                this._logger.LogError($"Failed to execute MySQL commands to release leases in attempt: {retryCount} for the specified table due to exception: {ex.GetType()}. Exception message: {ex.Message}");
                            }
                            else
                            {
                                this._logger.LogError($"Failed to release leases for the specified table after {MaxRetryReleaseLeases} attempts due to exception: {ex.GetType()}. Exception message: {ex.Message}");
                            }

                            try
                            {
                                transaction.Rollback();
                            }
                            catch (Exception ex2)
                            {
                                this._logger.LogError($"Failed to rollback transaction due to exception: {ex2.GetType()}. Exception message: {ex2.Message}");
                            }
                        }
                    }
                }
            }
            await this.ClearRowsAsync(token);
        }