in src/TriggerBinding/SqlTableChangeMonitor.cs [621:704]
private async Task ReleaseLeasesAsync(SqlConnection connection, CancellationToken token)
{
if (this._rowsToRelease.Count > 0)
{
long newLastSyncVersion = this.RecomputeLastSyncVersion();
bool retrySucceeded = false;
for (int retryCount = 1; retryCount <= MaxRetryReleaseLeases && !retrySucceeded; retryCount++)
{
var transactionSw = Stopwatch.StartNew();
long releaseLeasesDurationMs = 0L, updateLastSyncVersionDurationMs = 0L;
using (SqlTransaction transaction = connection.BeginTransaction(IsolationLevel.RepeatableRead))
{
try
{
// Release the leases held on "_rowsToRelease".
using (SqlCommand releaseLeasesCommand = this.BuildReleaseLeasesCommand(connection, transaction))
{
var commandSw = Stopwatch.StartNew();
int rowsUpdated = await releaseLeasesCommand.ExecuteNonQueryAsyncWithLogging(this._logger, token, true);
releaseLeasesDurationMs = commandSw.ElapsedMilliseconds;
}
// Update the global state table if we have processed all changes with ChangeVersion <= newLastSyncVersion,
// and clean up the leases table to remove all rows with ChangeVersion <= newLastSyncVersion.
using (SqlCommand updateTablesPostInvocationCommand = this.BuildUpdateTablesPostInvocation(connection, transaction, newLastSyncVersion))
{
var commandSw = Stopwatch.StartNew();
object result = await updateTablesPostInvocationCommand.ExecuteScalarAsyncWithLogging(this._logger, token);
if (result != null)
{
// If we updated the LastSyncVersion we'll get a message back from the query, so log it here
this._logger.LogDebug($"[PostInvocation] {result}");
}
updateLastSyncVersionDurationMs = commandSw.ElapsedMilliseconds;
}
transaction.Commit();
var measures = new Dictionary<TelemetryMeasureName, double>
{
[TelemetryMeasureName.ReleaseLeasesDurationMs] = releaseLeasesDurationMs,
[TelemetryMeasureName.UpdateLastSyncVersionDurationMs] = updateLastSyncVersionDurationMs,
[TelemetryMeasureName.TransactionDurationMs] = transactionSw.ElapsedMilliseconds,
};
TelemetryInstance.TrackEvent(TelemetryEventName.ReleaseLeases, this._telemetryProps, measures);
retrySucceeded = true;
this._rowsToRelease = new List<IReadOnlyDictionary<string, object>>();
}
catch (Exception ex)
{
if (retryCount < MaxRetryReleaseLeases)
{
this._logger.LogError($"Failed to execute SQL commands to release leases in attempt: {retryCount} for table '{this._userTable.FullName}' due to exception: {ex.GetType()}. Exception message: {ex.Message}");
var measures = new Dictionary<TelemetryMeasureName, double>
{
[TelemetryMeasureName.RetryAttemptNumber] = retryCount,
};
TelemetryInstance.TrackException(TelemetryErrorName.ReleaseLeases, ex, this._telemetryProps, measures);
}
else
{
this._logger.LogError($"Failed to release leases for table '{this._userTable.FullName}' after {MaxRetryReleaseLeases} attempts due to exception: {ex.GetType()}. Exception message: {ex.Message}");
TelemetryInstance.TrackException(TelemetryErrorName.ReleaseLeasesNoRetriesLeft, ex, this._telemetryProps);
}
try
{
transaction.Rollback();
}
catch (Exception ex2)
{
this._logger.LogError($"Failed to rollback transaction due to exception: {ex2.GetType()}. Exception message: {ex2.Message}");
TelemetryInstance.TrackException(TelemetryErrorName.ReleaseLeasesRollback, ex2, this._telemetryProps);
}
}
}
}
}
await this.ClearRowsAsync(token);
}