private async Task ProcessTableChangesAsync()

in src/TriggerBinding/SqlTableChangeMonitor.cs [391:454]


        private async Task ProcessTableChangesAsync(CancellationToken token)
        {
            if (this._rowsToProcess.Count > 0)
            {
                IReadOnlyList<SqlChange<T>> changes = null;

                try
                {
                    changes = await this.ProcessChanges(token);
                }
                catch (Exception e)
                {
                    // Either there's a bug or we're in a bad state so not much we can do here. We'll try clearing
                    //  our state and retry getting the changes from the top again in case something broke while
                    // fetching the changes.
                    // It doesn't make sense to retry processing the changes immediately since this isn't a connection-based issue.
                    // We could probably send up the changes we were able to process and just skip the ones we couldn't, but given
                    // that this is not a case we expect would happen during normal execution we'll err on the side of caution for
                    // now and just retry getting the whole set of changes.
                    this._logger.LogError($"Failed to compose trigger parameter value for table: '{this._userTable.FullName} due to exception: {e.GetType()}. Exception message: {e.Message}");
                    TelemetryInstance.TrackException(TelemetryErrorName.ProcessChanges, e, this._telemetryProps);
                    await this.ClearRowsAsync(token);
                }

                if (changes != null)
                {
                    var input = new TriggeredFunctionData() { TriggerValue = changes };

                    var stopwatch = Stopwatch.StartNew();

                    FunctionResult result = await this._executor.TryExecuteAsync(input, this._cancellationTokenSourceExecutor.Token);
                    long durationMs = stopwatch.ElapsedMilliseconds;
                    var measures = new Dictionary<TelemetryMeasureName, double>
                    {
                        [TelemetryMeasureName.DurationMs] = durationMs,
                        [TelemetryMeasureName.BatchCount] = this._rowsToProcess.Count,
                    };
                    // In the future might make sense to retry executing the function, but for now we just let
                    // another worker try.
                    if (result.Succeeded)
                    {

                        // We've successfully fully processed these so set them to be released in the cleanup phase
                        await this._rowsToProcessLock.WaitAsync(token);
                        this._rowsToRelease = this._rowsToProcess;
                        this._rowsToProcess = new List<IReadOnlyDictionary<string, object>>();
                        this._rowsToProcessLock.Release();
                    }
                    TelemetryInstance.TrackEvent(
                        TelemetryEventName.TriggerFunction,
                        new Dictionary<TelemetryPropertyName, string>(this._telemetryProps) {
                            { TelemetryPropertyName.Succeeded, result.Succeeded.ToString() },
                        },
                        measures);
                    this._state = State.Cleanup;
                }
            }
            else
            {
                // This ideally should never happen, but as a safety measure ensure that if we tried to process changes but there weren't
                // any we still ensure everything is reset to a clean state
                await this.ClearRowsAsync(token);
            }
        }