private async Task ReleaseLeasesAsync()

in src/TriggerBinding/SqlTableChangeMonitor.cs [621:704]


        private async Task ReleaseLeasesAsync(SqlConnection connection, CancellationToken token)
        {
            if (this._rowsToRelease.Count > 0)
            {
                long newLastSyncVersion = this.RecomputeLastSyncVersion();
                bool retrySucceeded = false;

                for (int retryCount = 1; retryCount <= MaxRetryReleaseLeases && !retrySucceeded; retryCount++)
                {
                    var transactionSw = Stopwatch.StartNew();
                    long releaseLeasesDurationMs = 0L, updateLastSyncVersionDurationMs = 0L;

                    using (SqlTransaction transaction = connection.BeginTransaction(IsolationLevel.RepeatableRead))
                    {
                        try
                        {
                            // Release the leases held on "_rowsToRelease".
                            using (SqlCommand releaseLeasesCommand = this.BuildReleaseLeasesCommand(connection, transaction))
                            {
                                var commandSw = Stopwatch.StartNew();
                                int rowsUpdated = await releaseLeasesCommand.ExecuteNonQueryAsyncWithLogging(this._logger, token, true);
                                releaseLeasesDurationMs = commandSw.ElapsedMilliseconds;
                            }

                            // Update the global state table if we have processed all changes with ChangeVersion <= newLastSyncVersion,
                            // and clean up the leases table to remove all rows with ChangeVersion <= newLastSyncVersion.
                            using (SqlCommand updateTablesPostInvocationCommand = this.BuildUpdateTablesPostInvocation(connection, transaction, newLastSyncVersion))
                            {
                                var commandSw = Stopwatch.StartNew();
                                object result = await updateTablesPostInvocationCommand.ExecuteScalarAsyncWithLogging(this._logger, token);
                                if (result != null)
                                {
                                    // If we updated the LastSyncVersion we'll get a message back from the query, so log it here
                                    this._logger.LogDebug($"[PostInvocation] {result}");
                                }
                                updateLastSyncVersionDurationMs = commandSw.ElapsedMilliseconds;
                            }
                            transaction.Commit();

                            var measures = new Dictionary<TelemetryMeasureName, double>
                            {
                                [TelemetryMeasureName.ReleaseLeasesDurationMs] = releaseLeasesDurationMs,
                                [TelemetryMeasureName.UpdateLastSyncVersionDurationMs] = updateLastSyncVersionDurationMs,
                                [TelemetryMeasureName.TransactionDurationMs] = transactionSw.ElapsedMilliseconds,
                            };

                            TelemetryInstance.TrackEvent(TelemetryEventName.ReleaseLeases, this._telemetryProps, measures);
                            retrySucceeded = true;
                            this._rowsToRelease = new List<IReadOnlyDictionary<string, object>>();
                        }
                        catch (Exception ex)
                        {
                            if (retryCount < MaxRetryReleaseLeases)
                            {
                                this._logger.LogError($"Failed to execute SQL commands to release leases in attempt: {retryCount} for table '{this._userTable.FullName}' due to exception: {ex.GetType()}. Exception message: {ex.Message}");

                                var measures = new Dictionary<TelemetryMeasureName, double>
                                {
                                    [TelemetryMeasureName.RetryAttemptNumber] = retryCount,
                                };

                                TelemetryInstance.TrackException(TelemetryErrorName.ReleaseLeases, ex, this._telemetryProps, measures);
                            }
                            else
                            {
                                this._logger.LogError($"Failed to release leases for table '{this._userTable.FullName}' after {MaxRetryReleaseLeases} attempts due to exception: {ex.GetType()}. Exception message: {ex.Message}");
                                TelemetryInstance.TrackException(TelemetryErrorName.ReleaseLeasesNoRetriesLeft, ex, this._telemetryProps);
                            }

                            try
                            {
                                transaction.Rollback();
                            }
                            catch (Exception ex2)
                            {
                                this._logger.LogError($"Failed to rollback transaction due to exception: {ex2.GetType()}. Exception message: {ex2.Message}");
                                TelemetryInstance.TrackException(TelemetryErrorName.ReleaseLeasesRollback, ex2, this._telemetryProps);
                            }
                        }
                    }
                }
            }
            await this.ClearRowsAsync(token);
        }