in src/TriggerBinding/SqlTableChangeMonitor.cs [934:972]
private SqlCommand BuildAcquireLeasesCommand(SqlConnection connection, SqlTransaction transaction, IReadOnlyList<IReadOnlyDictionary<string, object>> rows)
{
// The column definitions to use for the CTE
IEnumerable<string> cteColumnDefinitions = this._primaryKeyColumns
.Select(c => $"{c.name.AsBracketQuotedString()} {c.type}")
// These are the internal column values that we use. Note that we use SYS_CHANGE_VERSION because that's
// the new version - the _az_func_ChangeVersion has the old version
.Concat(new string[] { $"{SysChangeVersionColumnName} bigint", $"{LeasesTableAttemptCountColumnName} int" });
IEnumerable<string> bracketedPrimaryKeys = this._primaryKeyColumns.Select(p => p.name.AsBracketQuotedString());
// Create the query that the merge statement will match the rows on
string primaryKeyMatchingQuery = string.Join(" AND ", bracketedPrimaryKeys.Select(key => $"ExistingData.{key} = NewData.{key}"));
const string acquireLeasesCte = "acquireLeasesCte";
const string rowDataParameter = "@rowData";
// Create the merge query that will either update the rows that already exist or insert a new one if it doesn't exist
string query = $@"
{AppLockStatements}
WITH {acquireLeasesCte} AS ( SELECT * FROM OPENJSON(@rowData) WITH ({string.Join(",", cteColumnDefinitions)}) )
MERGE INTO {this._bracketedLeasesTableName}
AS ExistingData
USING {acquireLeasesCte}
AS NewData
ON
{primaryKeyMatchingQuery}
WHEN MATCHED THEN
UPDATE SET
{LeasesTableChangeVersionColumnName} = NewData.{SysChangeVersionColumnName},
{LeasesTableAttemptCountColumnName} = ExistingData.{LeasesTableAttemptCountColumnName} + 1,
{LeasesTableLeaseExpirationTimeColumnName} = DATEADD(second, {LeaseIntervalInSeconds}, SYSDATETIME())
WHEN NOT MATCHED THEN
INSERT VALUES ({string.Join(",", bracketedPrimaryKeys.Select(k => $"NewData.{k}"))}, NewData.{SysChangeVersionColumnName}, 1, DATEADD(second, {LeaseIntervalInSeconds}, SYSDATETIME()));";
var command = new SqlCommand(query, connection, transaction);
SqlParameter par = command.Parameters.Add(rowDataParameter, SqlDbType.NVarChar, -1);
string rowData = Utils.JsonSerializeObject(rows);
par.Value = rowData;
return command;
}