in src/SqlAsyncCollector.cs [362:417]
private static void GenerateDataQueryForMerge(TableInformation table, IEnumerable<T> rows, out string newDataQuery, out string rowData)
{
IList<T> rowsToUpsert = new List<T>();
var uniqueUpdatedPrimaryKeys = new HashSet<string>();
// If there are duplicate primary keys, we'll need to pick the LAST (most recent) row per primary key.
foreach (T row in rows.Reverse())
{
if (typeof(T) != typeof(JObject))
{
if (table.HasIdentityColumnPrimaryKeys)
{
// If the table has an identity column as a primary key then
// all rows are guaranteed to be unique so we can insert them all
rowsToUpsert.Add(row);
}
else
{
// SQL Server allows 900 bytes per primary key, so use that as a baseline
var combinedPrimaryKey = new StringBuilder(900 * table.PrimaryKeyProperties.Count());
// Look up primary key of T. Because we're going in the same order of properties every time,
// we can assume that if two rows with the same primary key are in the list, they will collide
foreach (PropertyInfo primaryKeyProperty in table.PrimaryKeyProperties)
{
object value = primaryKeyProperty.GetValue(row);
// Identity columns are allowed to be optional, so just skip the key if it doesn't exist
if (value == null)
{
continue;
}
combinedPrimaryKey.Append(value.ToString());
}
string combinedPrimaryKeyStr = combinedPrimaryKey.ToString();
// If we have already seen this unique primary key, skip this update
// If the combined key is empty that means
if (uniqueUpdatedPrimaryKeys.Add(combinedPrimaryKeyStr))
{
// This is the first time we've seen this particular PK. Add this row to the upsert query.
rowsToUpsert.Add(row);
}
}
}
else
{
// ToDo: add check for duplicate primary keys once we find a way to get primary keys.
rowsToUpsert.Add(row);
}
}
rowData = Utils.JsonSerializeObject(rowsToUpsert, table.JsonSerializerSettings);
IEnumerable<string> columnNamesFromItem = GetColumnNamesFromItem(rows.First());
IEnumerable<string> bracketColumnDefinitionsFromItem = columnNamesFromItem.Select(c => $"{c.AsBracketQuotedString()} {table.Columns[c]}");
// Escape any forward and backward slashes in the column names of rowData using REPLACE so the OPENJSON can read from those columns.
newDataQuery = $"WITH {CteName} AS ( SELECT * FROM OPENJSON(REPLACE({RowDataParameter}, N'/', N'\\/')) WITH ({string.Join(",", bracketColumnDefinitionsFromItem)}) )";
}