private static void GenerateDataQueryForMerge()

in src/SqlAsyncCollector.cs [362:417]


        private static void GenerateDataQueryForMerge(TableInformation table, IEnumerable<T> rows, out string newDataQuery, out string rowData)
        {
            IList<T> rowsToUpsert = new List<T>();

            var uniqueUpdatedPrimaryKeys = new HashSet<string>();

            // If there are duplicate primary keys, we'll need to pick the LAST (most recent) row per primary key.
            foreach (T row in rows.Reverse())
            {
                if (typeof(T) != typeof(JObject))
                {
                    if (table.HasIdentityColumnPrimaryKeys)
                    {
                        // If the table has an identity column as a primary key then
                        // all rows are guaranteed to be unique so we can insert them all
                        rowsToUpsert.Add(row);
                    }
                    else
                    {
                        // SQL Server allows 900 bytes per primary key, so use that as a baseline
                        var combinedPrimaryKey = new StringBuilder(900 * table.PrimaryKeyProperties.Count());
                        // Look up primary key of T. Because we're going in the same order of properties every time,
                        // we can assume that if two rows with the same primary key are in the list, they will collide
                        foreach (PropertyInfo primaryKeyProperty in table.PrimaryKeyProperties)
                        {
                            object value = primaryKeyProperty.GetValue(row);
                            // Identity columns are allowed to be optional, so just skip the key if it doesn't exist
                            if (value == null)
                            {
                                continue;
                            }
                            combinedPrimaryKey.Append(value.ToString());
                        }
                        string combinedPrimaryKeyStr = combinedPrimaryKey.ToString();
                        // If we have already seen this unique primary key, skip this update
                        // If the combined key is empty that means
                        if (uniqueUpdatedPrimaryKeys.Add(combinedPrimaryKeyStr))
                        {
                            // This is the first time we've seen this particular PK. Add this row to the upsert query.
                            rowsToUpsert.Add(row);
                        }
                    }
                }
                else
                {
                    // ToDo: add check for duplicate primary keys once we find a way to get primary keys.
                    rowsToUpsert.Add(row);
                }
            }

            rowData = Utils.JsonSerializeObject(rowsToUpsert, table.JsonSerializerSettings);
            IEnumerable<string> columnNamesFromItem = GetColumnNamesFromItem(rows.First());
            IEnumerable<string> bracketColumnDefinitionsFromItem = columnNamesFromItem.Select(c => $"{c.AsBracketQuotedString()} {table.Columns[c]}");
            // Escape any forward and backward slashes in the column names of rowData using REPLACE so the OPENJSON can read from those columns.
            newDataQuery = $"WITH {CteName} AS ( SELECT * FROM OPENJSON(REPLACE({RowDataParameter}, N'/', N'\\/')) WITH ({string.Join(",", bracketColumnDefinitionsFromItem)}) )";
        }