private SqlCommand BuildGetChangesCommand()

in src/TriggerBinding/SqlTableChangeMonitor.cs [819:862]


        private SqlCommand BuildGetChangesCommand(SqlConnection connection, SqlTransaction transaction)
        {
            string selectList = string.Join(", ", this._userTableColumns.Select(col => this._primaryKeyColumns.Select(c => c.name).Contains(col) ? $"c.{col.AsBracketQuotedString()}" : $"u.{col.AsBracketQuotedString()}"));
            string userTableJoinCondition = string.Join(" AND ", this._primaryKeyColumns.Select(col => $"c.{col.name.AsBracketQuotedString()} = u.{col.name.AsBracketQuotedString()}"));
            string leasesTableJoinCondition = string.Join(" AND ", this._primaryKeyColumns.Select(col => $"c.{col.name.AsBracketQuotedString()} = l.{col.name.AsBracketQuotedString()}"));

            // Get the list of changes from CHANGETABLE that meet the following criteria:
            // * Null LeaseExpirationTime AND (Null ChangeVersion OR ChangeVersion < Current change version for that row from CHANGETABLE)
            //   OR
            // * LeaseExpirationTime < Current Time
            //
            // The LeaseExpirationTime is only used for rows currently being processed - so if we see a
            // row whose lease has expired that means that something must have happened to the function
            // processing it before it was able to complete successfully. In that case we want to pick it
            // up regardless since we know it should be processed - no need to check the change version.
            // Once a row is successfully processed the LeaseExpirationTime column is set to NULL.
            string getChangesQuery = $@"
                {AppLockStatements}

                DECLARE @last_sync_version bigint;
                SELECT @last_sync_version = LastSyncVersion
                FROM {GlobalStateTableName}
                WHERE UserFunctionID = '{this._userFunctionId}' AND UserTableID = {this._userTableId};

                SELECT TOP {this._maxBatchSize}
                    {selectList},
                    c.{SysChangeVersionColumnName},
                    c.SYS_CHANGE_OPERATION,
                    l.{LeasesTableChangeVersionColumnName},
                    l.{LeasesTableAttemptCountColumnName},
                    l.{LeasesTableLeaseExpirationTimeColumnName}
                FROM CHANGETABLE(CHANGES {this._userTable.BracketQuotedFullName}, @last_sync_version) AS c
                LEFT OUTER JOIN {this._bracketedLeasesTableName} AS l ON {leasesTableJoinCondition}
                LEFT OUTER JOIN {this._userTable.BracketQuotedFullName} AS u ON {userTableJoinCondition}
                WHERE
                    (l.{LeasesTableLeaseExpirationTimeColumnName} IS NULL AND
                       (l.{LeasesTableChangeVersionColumnName} IS NULL OR l.{LeasesTableChangeVersionColumnName} < c.{SysChangeVersionColumnName}) OR
                        l.{LeasesTableLeaseExpirationTimeColumnName} < SYSDATETIME()
                    ) AND
                    (l.{LeasesTableAttemptCountColumnName} IS NULL OR l.{LeasesTableAttemptCountColumnName} < {MaxChangeProcessAttemptCount})
                ORDER BY c.{SysChangeVersionColumnName} ASC;";

            return new SqlCommand(getChangesQuery, connection, transaction);
        }