private SqlCommand BuildAcquireLeasesCommand()

in src/TriggerBinding/SqlTableChangeMonitor.cs [934:972]


        private SqlCommand BuildAcquireLeasesCommand(SqlConnection connection, SqlTransaction transaction, IReadOnlyList<IReadOnlyDictionary<string, object>> rows)
        {
            // The column definitions to use for the CTE
            IEnumerable<string> cteColumnDefinitions = this._primaryKeyColumns
                .Select(c => $"{c.name.AsBracketQuotedString()} {c.type}")
                // These are the internal column values that we use. Note that we use SYS_CHANGE_VERSION because that's
                // the new version - the _az_func_ChangeVersion has the old version
                .Concat(new string[] { $"{SysChangeVersionColumnName} bigint", $"{LeasesTableAttemptCountColumnName} int" });
            IEnumerable<string> bracketedPrimaryKeys = this._primaryKeyColumns.Select(p => p.name.AsBracketQuotedString());

            // Create the query that the merge statement will match the rows on
            string primaryKeyMatchingQuery = string.Join(" AND ", bracketedPrimaryKeys.Select(key => $"ExistingData.{key} = NewData.{key}"));
            const string acquireLeasesCte = "acquireLeasesCte";
            const string rowDataParameter = "@rowData";
            // Create the merge query that will either update the rows that already exist or insert a new one if it doesn't exist
            string query = $@"
                    {AppLockStatements}

                    WITH {acquireLeasesCte} AS ( SELECT * FROM OPENJSON(@rowData) WITH ({string.Join(",", cteColumnDefinitions)}) )
                    MERGE INTO {this._bracketedLeasesTableName}
                        AS ExistingData
                    USING {acquireLeasesCte}
                        AS NewData
                    ON
                        {primaryKeyMatchingQuery}
                    WHEN MATCHED THEN
                        UPDATE SET
                        {LeasesTableChangeVersionColumnName} = NewData.{SysChangeVersionColumnName},
                        {LeasesTableAttemptCountColumnName} = ExistingData.{LeasesTableAttemptCountColumnName} + 1,
                        {LeasesTableLeaseExpirationTimeColumnName} = DATEADD(second, {LeaseIntervalInSeconds}, SYSDATETIME())
                    WHEN NOT MATCHED THEN
                        INSERT VALUES ({string.Join(",", bracketedPrimaryKeys.Select(k => $"NewData.{k}"))}, NewData.{SysChangeVersionColumnName}, 1, DATEADD(second, {LeaseIntervalInSeconds}, SYSDATETIME()));";

            var command = new SqlCommand(query, connection, transaction);
            SqlParameter par = command.Parameters.Add(rowDataParameter, SqlDbType.NVarChar, -1);
            string rowData = Utils.JsonSerializeObject(rows);
            par.Value = rowData;
            return command;
        }