private async Task GetTableChangesAsync()

in src/TriggerBinding/SqlTableChangeMonitor.cs [277:389]


        private async Task GetTableChangesAsync(SqlConnection connection, CancellationToken token)
        {
            try
            {
                var transactionSw = Stopwatch.StartNew();
                long setLastSyncVersionDurationMs = 0L, getChangesDurationMs = 0L, acquireLeasesDurationMs = 0L;

                using (SqlTransaction transaction = connection.BeginTransaction(IsolationLevel.RepeatableRead))
                {
                    try
                    {
                        // Update the version number stored in the global state table if necessary before using it.
                        using (SqlCommand updateTablesPreInvocationCommand = this.BuildUpdateTablesPreInvocation(connection, transaction))
                        {
                            var commandSw = Stopwatch.StartNew();
                            object result = await updateTablesPreInvocationCommand.ExecuteScalarAsyncWithLogging(this._logger, token, true);
                            if (result != null)
                            {
                                // If we updated the LastSyncVersion we'll get a message back from the query, so log it here
                                this._logger.LogDebug($"[PreInvocation] {result}");
                            }
                            setLastSyncVersionDurationMs = commandSw.ElapsedMilliseconds;
                        }

                        var rows = new List<IReadOnlyDictionary<string, object>>();

                        // Use the version number to query for new changes.
                        using (SqlCommand getChangesCommand = this.BuildGetChangesCommand(connection, transaction))
                        {
                            var commandSw = Stopwatch.StartNew();

                            using (SqlDataReader reader = getChangesCommand.ExecuteReader())
                            {
                                while (reader.Read())
                                {
                                    token.ThrowIfCancellationRequested();
                                    rows.Add(SqlBindingUtilities.BuildDictionaryFromSqlRow(reader));
                                }
                            }

                            getChangesDurationMs = commandSw.ElapsedMilliseconds;
                        }
                        // Also get the number of rows that currently have lease locks on them
                        // or are skipped because they have reached their max attempt count.
                        // This can help with supportability by allowing a customer to see when a
                        // trigger was processed successfully but returned fewer rows than expected.
                        string leaseLockedOrMaxAttemptRowCountMessage = await this.GetLeaseLockedOrMaxAttemptRowCountMessage(connection, transaction, token);
                        if (rows.Count > 0 || leaseLockedOrMaxAttemptRowCountMessage != null)
                        {
                            this._logger.LogDebug($"Executed GetChangesCommand in GetTableChangesAsync. {rows.Count} available changed rows. {leaseLockedOrMaxAttemptRowCountMessage}");
                        }
                        // If changes were found, acquire leases on them.
                        if (rows.Count > 0)
                        {
                            using (SqlCommand acquireLeasesCommand = this.BuildAcquireLeasesCommand(connection, transaction, rows))
                            {
                                var commandSw = Stopwatch.StartNew();
                                await acquireLeasesCommand.ExecuteNonQueryAsyncWithLogging(this._logger, token);
                                acquireLeasesDurationMs = commandSw.ElapsedMilliseconds;
                            }

                            // Only send event if we got changes to reduce the overall number of events sent since we generally
                            // only care about the times that we had to actually retrieve and process rows
                            var measures = new Dictionary<TelemetryMeasureName, double>
                            {
                                [TelemetryMeasureName.SetLastSyncVersionDurationMs] = setLastSyncVersionDurationMs,
                                [TelemetryMeasureName.GetChangesDurationMs] = getChangesDurationMs,
                                [TelemetryMeasureName.AcquireLeasesDurationMs] = acquireLeasesDurationMs,
                                [TelemetryMeasureName.TransactionDurationMs] = transactionSw.ElapsedMilliseconds,
                                [TelemetryMeasureName.BatchCount] = this._rowsToProcess.Count,
                            };
                            TelemetryInstance.TrackEvent(TelemetryEventName.GetChanges, this._telemetryProps, measures);
                        }

                        transaction.Commit();

                        // Set the rows for processing, now since the leases are acquired.
                        await this._rowsToProcessLock.WaitAsync(token);
                        this._rowsToProcess = rows;
                        this._state = State.ProcessingChanges;
                        this._rowsToProcessLock.Release();
                    }
                    catch (Exception)
                    {
                        try
                        {
                            transaction.Rollback();
                        }
                        catch (Exception ex)
                        {
                            this._logger.LogError($"Failed to rollback transaction due to exception: {ex.GetType()}. Exception message: {ex.Message}");
                            TelemetryInstance.TrackException(TelemetryErrorName.GetChangesRollback, ex, this._telemetryProps);
                        }
                        throw;
                    }
                }
            }
            catch (Exception e)
            {
                // If there's an exception in any part of the process, we want to clear all of our data in memory and
                // retry checking for changes again.
                await this._rowsToProcessLock.WaitAsync(token);
                this._rowsToProcess = new List<IReadOnlyDictionary<string, object>>();
                this._rowsToProcessLock.Release();
                this._logger.LogError($"Failed to check for changes in table '{this._userTable.FullName}' due to exception: {e.GetType()}. Exception message: {e.Message}");
                TelemetryInstance.TrackException(TelemetryErrorName.GetChanges, e, this._telemetryProps);
                if (e.IsFatalSqlException() || connection.IsBrokenOrClosed())
                {
                    // If we get a fatal SQL Client exception or the connection is broken let it bubble up so we can try to re-establish the connection
                    throw;
                }
            }
        }