private async Task GetTableChangesAsync()

in src/TriggersBinding/MySqlTableChangeMonitor.cs [244:324]


        private async Task GetTableChangesAsync(MySqlConnection connection, CancellationToken token)
        {
            try
            {
                var transactionSw = Stopwatch.StartNew();
                long getChangesDurationMs = 0L, acquireLeasesDurationMs = 0L;

                using (MySqlTransaction transaction = connection.BeginTransaction())
                {
                    try
                    {
                        var rows = new List<IReadOnlyDictionary<string, object>>();
                        // query for new changes.
                        using (MySqlCommand getChangesCommand = this.BuildGetChangesCommand(connection, transaction))
                        {
                            var commandSw = Stopwatch.StartNew();
                            this._logger.LogInformation($"Looking for latest changes on the configured table");

                            using (MySqlDataReader reader = getChangesCommand.ExecuteReaderWithLogging(this._logger))
                            {
                                while (reader.Read())
                                {
                                    token.ThrowIfCancellationRequested();
                                    rows.Add(MySqlBindingUtilities.BuildDictionaryFromMySqlRow(reader));
                                }
                            }
                            getChangesDurationMs = commandSw.ElapsedMilliseconds;
                        }

                        // If changes were found
                        if (rows.Count > 0)
                        {
                            this._logger.LogInformation($"The total no of rows found to process is {rows.Count}");

                            using (MySqlCommand acquireLeasesCommand = this.BuildAcquireLeasesCommand(connection, transaction, rows))
                            {
                                var commandSw = Stopwatch.StartNew();
                                this._logger.LogDebug($"Acquiring lease ...");
                                await acquireLeasesCommand.ExecuteNonQueryAsyncWithLogging(this._logger, token);
                                acquireLeasesDurationMs = commandSw.ElapsedMilliseconds;
                            }
                        }

                        transaction.Commit();

                        // Set the rows for processing, now since the leases are acquired.
                        await this._rowsLock.WaitAsync(token);
                        this._rowsToProcess = rows;
                        this._state = State.ProcessingChanges;
                        this._rowsLock.Release();
                    }
                    catch (Exception)
                    {
                        try
                        {
                            transaction.Rollback();
                        }
                        catch (Exception ex)
                        {
                            this._logger.LogError($"Failed to rollback transaction due to exception: {ex.GetType()}. Exception message: {ex.Message}");
                        }
                        throw;
                    }
                }
            }
            catch (Exception e)
            {
                // If there's an exception in any part of the process, we want to clear all of our data in memory and
                // retry checking for changes again.
                await this._rowsLock.WaitAsync(token);
                this._rowsToProcess = new List<IReadOnlyDictionary<string, object>>();
                this._rowsLock.Release();

                this._logger.LogError($"Failed to check for changes in the specified table due to exception: {e.GetType()}. Exception message: {e.Message}");
                if (connection.IsBrokenOrClosed())      // TODO: e.IsFatalMySqlException() || - check mysql corresponding
                {
                    // If we get a fatal MySQL Client exception or the connection is broken let it bubble up so we can try to re-establish the connection
                    throw;
                }
            }
        }