private async Task ProcessTableChangesAsync()

in src/TriggersBinding/MySqlTableChangeMonitor.cs [326:378]


        private async Task ProcessTableChangesAsync(CancellationToken token)
        {
            if (this._rowsToProcess.Count > 0)
            {
                IReadOnlyList<MySqlChange<T>> changes = null;

                try
                {
                    changes = this.ProcessChanges();
                }
                catch (Exception e)
                {
                    // Either there's a bug or we're in a bad state so not much we can do here. We'll try clearing
                    //  our state and retry getting the changes from the top again in case something broke while
                    // fetching the changes.
                    // It doesn't make sense to retry processing the changes immediately since this isn't a connection-based issue.
                    // We could probably send up the changes we were able to process and just skip the ones we couldn't, but given
                    // that this is not a case we expect would happen during normal execution we'll err on the side of caution for
                    // now and just retry getting the whole set of changes.
                    this._logger.LogError($"Failed to compose trigger parameter value for the specified table due to exception: {e.GetType()}. Exception message: {e.Message}");
                    await this.ClearRowsAsync(token);
                }

                if (changes != null)
                {
                    var input = new TriggeredFunctionData() { TriggerValue = changes };

                    // var stopwatch = Stopwatch.StartNew();

                    FunctionResult result = await this._executor.TryExecuteAsync(input, this._cancellationTokenSourceExecutor.Token);
                    // long durationMs = stopwatch.ElapsedMilliseconds;
                    if (result.Succeeded)
                    {
                        this._logger.LogInformation("Function Trigger executed successfully.");
                        await this._rowsLock.WaitAsync(token);
                        this._rowsToRelease = this._rowsToProcess;
                        this._rowsToProcess = new List<IReadOnlyDictionary<string, object>>();
                        this._rowsLock.Release();
                    }
                    else
                    {
                        this._logger.LogError($"Exception encountered while executing the Function Trigger. Exception: {result.Exception}");
                    }
                    this._state = State.Cleanup;
                }
            }
            else
            {
                // This ideally should never happen, but as a safety measure ensure that if we tried to process changes but there weren't
                // any we still ensure everything is reset to a clean state
                await this.ClearRowsAsync(token);
            }
        }