private async Task UpsertRowsAsync()

in src/MySqlAsyncCollector.cs [163:261]


        private async Task UpsertRowsAsync(IList<T> rows, MySqlAttribute attribute, IConfiguration configuration)
        {
            using (MySqlConnection connection = BuildConnection(attribute.ConnectionStringSetting, configuration))
            {
                await connection.OpenAsync();
                string fullTableName = attribute.CommandText;

                // Include the connection string hash as part of the key in case this customer has the same table in two different MySql Servers
                string cacheKey = $"{connection.ConnectionString.GetHashCode()}-{fullTableName}";

                ObjectCache cachedTables = MemoryCache.Default;

                int timeout = AZ_FUNC_TABLE_INFO_CACHE_TIMEOUT_MINUTES;
                string timeoutEnvVar = Environment.GetEnvironmentVariable("AZ_FUNC_TABLE_INFO_CACHE_TIMEOUT_MINUTES");
                if (!string.IsNullOrEmpty(timeoutEnvVar))
                {
                    if (int.TryParse(timeoutEnvVar, NumberStyles.Integer, CultureInfo.InvariantCulture, out timeout))
                    {
                        this._logger.LogDebug($"Overriding default table info cache timeout with new value {timeout}");
                    }
                    else
                    {
                        timeout = AZ_FUNC_TABLE_INFO_CACHE_TIMEOUT_MINUTES;
                    }
                }

                if (!(cachedTables[cacheKey] is TableInformation tableInfo))
                {
                    this._logger.LogInformation($"Sending event TableInfoCacheMiss");
                    // set the columnNames for supporting T as JObject since it doesn't have columns in the member info.
                    tableInfo = TableInformation.RetrieveTableInformation(connection, fullTableName, this._logger, GetColumnNamesFromItem(rows.First()));
                    var policy = new CacheItemPolicy
                    {
                        // Re-look up the primary key(s) after timeout (default timeout is 10 minutes)
                        AbsoluteExpiration = DateTimeOffset.Now.AddMinutes(timeout)
                    };

                    cachedTables.Set(cacheKey, tableInfo, policy);
                }
                else
                {
                    this._logger.LogInformation($"Sending event TableInfoCacheHit");
                }

                IEnumerable<string> extraProperties = GetExtraProperties(tableInfo.Columns, rows.First());
                if (extraProperties.Any())
                {
                    string message = $"The following properties in {typeof(T)} do not exist in the specified table";
                    var ex = new InvalidOperationException(message);
                    throw ex;
                }

                IEnumerable<string> columnNamesFromItem = GetColumnNamesFromItem(rows.First());

                var table = new MySqlObject(fullTableName);
                string insertQuery = TableInformation.GetInsertQuery(table, columnNamesFromItem);

                string duplicateUpdateQuery = TableInformation.GetOnDuplicateUpdateQuery(columnNamesFromItem);

                var transactionSw = Stopwatch.StartNew();
                int batchSize = 1000;
                MySqlTransaction transaction = connection.BeginTransaction();
                try
                {
                    MySqlCommand command = connection.CreateCommand();
                    command.Connection = connection;
                    command.Transaction = transaction;
                    int batchCount = 0;
                    var commandSw = Stopwatch.StartNew();
                    foreach (IEnumerable<T> batch in rows.Batch(batchSize))
                    {
                        batchCount++;
                        GenerateDataQueryForMerge(tableInfo, batch, columnNamesFromItem, out string newDataQuery);
                        command.CommandText = $"{insertQuery} {newDataQuery} {duplicateUpdateQuery};";

                        await command.ExecuteNonQueryAsyncWithLogging(this._logger, CancellationToken.None, true);
                    }
                    transaction.Commit();
                    transactionSw.Stop();
                    this._logger.LogInformation($"Sending event Upsert Rows - BatchCount: {batchCount}, TransactionDurationMs: {transactionSw.ElapsedMilliseconds}," +
                        $"CommandDurationMs: {commandSw.ElapsedMilliseconds}, BatchSize: {batchSize}, Rows: {rows.Count}");
                }
                catch (Exception ex)
                {
                    try
                    {
                        this._logger.LogError($"Error Upserting rows. Message:{ex.Message}");
                        transaction.Rollback();
                    }
                    catch (Exception ex2)
                    {
                        this._logger.LogError($"Error Upserting rows and rollback. Message:{ex2.Message}");
                        string message2 = $"Encountered exception during upsert and rollback.";
                        throw new AggregateException(message2, new List<Exception> { ex, ex2 });
                    }
                    throw new InvalidOperationException($"Unexpected error upserting rows", ex);
                }
            }
        }