in src/MySqlAsyncCollector.cs [163:261]
private async Task UpsertRowsAsync(IList<T> rows, MySqlAttribute attribute, IConfiguration configuration)
{
using (MySqlConnection connection = BuildConnection(attribute.ConnectionStringSetting, configuration))
{
await connection.OpenAsync();
string fullTableName = attribute.CommandText;
// Include the connection string hash as part of the key in case this customer has the same table in two different MySql Servers
string cacheKey = $"{connection.ConnectionString.GetHashCode()}-{fullTableName}";
ObjectCache cachedTables = MemoryCache.Default;
int timeout = AZ_FUNC_TABLE_INFO_CACHE_TIMEOUT_MINUTES;
string timeoutEnvVar = Environment.GetEnvironmentVariable("AZ_FUNC_TABLE_INFO_CACHE_TIMEOUT_MINUTES");
if (!string.IsNullOrEmpty(timeoutEnvVar))
{
if (int.TryParse(timeoutEnvVar, NumberStyles.Integer, CultureInfo.InvariantCulture, out timeout))
{
this._logger.LogDebug($"Overriding default table info cache timeout with new value {timeout}");
}
else
{
timeout = AZ_FUNC_TABLE_INFO_CACHE_TIMEOUT_MINUTES;
}
}
if (!(cachedTables[cacheKey] is TableInformation tableInfo))
{
this._logger.LogInformation($"Sending event TableInfoCacheMiss");
// set the columnNames for supporting T as JObject since it doesn't have columns in the member info.
tableInfo = TableInformation.RetrieveTableInformation(connection, fullTableName, this._logger, GetColumnNamesFromItem(rows.First()));
var policy = new CacheItemPolicy
{
// Re-look up the primary key(s) after timeout (default timeout is 10 minutes)
AbsoluteExpiration = DateTimeOffset.Now.AddMinutes(timeout)
};
cachedTables.Set(cacheKey, tableInfo, policy);
}
else
{
this._logger.LogInformation($"Sending event TableInfoCacheHit");
}
IEnumerable<string> extraProperties = GetExtraProperties(tableInfo.Columns, rows.First());
if (extraProperties.Any())
{
string message = $"The following properties in {typeof(T)} do not exist in the specified table";
var ex = new InvalidOperationException(message);
throw ex;
}
IEnumerable<string> columnNamesFromItem = GetColumnNamesFromItem(rows.First());
var table = new MySqlObject(fullTableName);
string insertQuery = TableInformation.GetInsertQuery(table, columnNamesFromItem);
string duplicateUpdateQuery = TableInformation.GetOnDuplicateUpdateQuery(columnNamesFromItem);
var transactionSw = Stopwatch.StartNew();
int batchSize = 1000;
MySqlTransaction transaction = connection.BeginTransaction();
try
{
MySqlCommand command = connection.CreateCommand();
command.Connection = connection;
command.Transaction = transaction;
int batchCount = 0;
var commandSw = Stopwatch.StartNew();
foreach (IEnumerable<T> batch in rows.Batch(batchSize))
{
batchCount++;
GenerateDataQueryForMerge(tableInfo, batch, columnNamesFromItem, out string newDataQuery);
command.CommandText = $"{insertQuery} {newDataQuery} {duplicateUpdateQuery};";
await command.ExecuteNonQueryAsyncWithLogging(this._logger, CancellationToken.None, true);
}
transaction.Commit();
transactionSw.Stop();
this._logger.LogInformation($"Sending event Upsert Rows - BatchCount: {batchCount}, TransactionDurationMs: {transactionSw.ElapsedMilliseconds}," +
$"CommandDurationMs: {commandSw.ElapsedMilliseconds}, BatchSize: {batchSize}, Rows: {rows.Count}");
}
catch (Exception ex)
{
try
{
this._logger.LogError($"Error Upserting rows. Message:{ex.Message}");
transaction.Rollback();
}
catch (Exception ex2)
{
this._logger.LogError($"Error Upserting rows and rollback. Message:{ex2.Message}");
string message2 = $"Encountered exception during upsert and rollback.";
throw new AggregateException(message2, new List<Exception> { ex, ex2 });
}
throw new InvalidOperationException($"Unexpected error upserting rows", ex);
}
}
}