in src/TriggerBinding/SqlTableChangeMonitor.cs [872:924]
private async Task<string> GetLeaseLockedOrMaxAttemptRowCountMessage(SqlConnection connection, SqlTransaction transaction, CancellationToken token)
{
string leasesTableJoinCondition = string.Join(" AND ", this._primaryKeyColumns.Select(col => $"c.{col.name.AsBracketQuotedString()} = l.{col.name.AsBracketQuotedString()}"));
// Get the count of changes from CHANGETABLE that meet the following criteria:
// Lease locked:
// * Have attempts remaining (Attempt count < Max attempts)
// * NOT NULL LeaseExpirationTime
// * LeaseExpirationTime > Current Time
// Max Attempts reached:
// * NULL LeaseExpirationTime OR LeaseExpirationTime <= Current Time
// * No attempts remaining (Attempt count = Max attempts)
string getLeaseLockedOrMaxAttemptRowCountQuery = $@"
{AppLockStatements}
DECLARE @last_sync_version bigint;
SELECT @last_sync_version = LastSyncVersion
FROM {GlobalStateTableName}
WHERE UserFunctionID = '{this._userFunctionId}' AND UserTableID = {this._userTableId};
DECLARE @lease_locked_count int;
DECLARE @max_attempts_count int;
SELECT
@lease_locked_count = COUNT(CASE WHEN l.{LeasesTableAttemptCountColumnName} < {MaxChangeProcessAttemptCount} AND l.{LeasesTableLeaseExpirationTimeColumnName} IS NOT NULL AND l.{LeasesTableLeaseExpirationTimeColumnName} > SYSDATETIME() THEN 1 ELSE NULL END),
@max_attempts_count = COUNT(CASE WHEN (l.{LeasesTableLeaseExpirationTimeColumnName} IS NULL OR l.{LeasesTableLeaseExpirationTimeColumnName} <= SYSDATETIME()) AND l.{LeasesTableAttemptCountColumnName} = {MaxChangeProcessAttemptCount} THEN 1 ELSE NULL END)
FROM CHANGETABLE(CHANGES {this._userTable.BracketQuotedFullName}, @last_sync_version) AS c
LEFT OUTER JOIN {this._bracketedLeasesTableName} AS l ON {leasesTableJoinCondition};
IF @lease_locked_count > 0 OR @max_attempts_count > 0
BEGIN
SELECT '(' + CAST(@lease_locked_count AS NVARCHAR) + ' found with lease locks and ' + CAST(@max_attempts_count AS NVARCHAR) + ' ignored because they''ve reached the max attempt limit)';
END";
try
{
using (var getLeaseLockedOrMaxAttemptsRowCountCommand = new SqlCommand(getLeaseLockedOrMaxAttemptRowCountQuery, connection, transaction))
{
return (await getLeaseLockedOrMaxAttemptsRowCountCommand.ExecuteScalarAsyncWithLogging(this._logger, token))?.ToString();
}
}
catch (Exception ex)
{
this._logger.LogError($"Failed to query count of lease locked or max attempt changes for table '{this._userTable.FullName}' due to exception: {ex.GetType()}. Exception message: {ex.Message}");
TelemetryInstance.TrackException(TelemetryErrorName.GetLeaseLockedOrMaxAttemptRowCount, ex);
// This is currently only used for debugging, so ignore the exception if we can. If the error is a fatal one though then the connection or transaction will be
// unusable so we have to let this bubble up so we can attempt to reconnect
if (ex.IsFatalSqlException() || ex.IsDeadlockException() || connection.IsBrokenOrClosed())
{
throw;
}
else
{
// If it's non-fatal though return null instead of throwing since it isn't necessary to get the value
return null;
}
}
}