in CosmosClone/CosmosCloneCommon/Utility/CosmosBulkImporter.cs [49:90]
public async Task<BulkImportResponse> BulkSendToNewCollection<T>(List<T> entityList)
{
BulkImportResponse bulkImportResponse = null;
var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;
int attempts = 0;
var objList = entityList.Cast<Object>();
do
{
bulkImportResponse = await bulkExecutor.BulkImportAsync(
documents: objList,
enableUpsert: true,
disableAutomaticIdGeneration: true,
maxConcurrencyPerPartitionKeyRange: null,
maxInMemorySortingBatchSize: null,
cancellationToken: token);
attempts++;
} while (bulkImportResponse.NumberOfDocumentsImported < entityList.Count() && attempts <= 5);
var badDocumentList = bulkImportResponse.BadInputDocuments;
#region log bulk Summary
logger.LogInfo(String.Format("\n Batch Upload completed "));
logger.LogInfo("--------------------------------------------------------------------- ");
logger.LogInfo(String.Format("Inserted {0} docs @ {1} writes/s, {2} RU/s in {3} sec",
bulkImportResponse.NumberOfDocumentsImported,
Math.Round(bulkImportResponse.NumberOfDocumentsImported / bulkImportResponse.TotalTimeTaken.TotalSeconds, 2),
Math.Round(bulkImportResponse.TotalRequestUnitsConsumed / bulkImportResponse.TotalTimeTaken.TotalSeconds, 2),
bulkImportResponse.TotalTimeTaken.TotalSeconds));
logger.LogInfo(String.Format("Average RU consumption per document: {0}",
Math.Round(bulkImportResponse.TotalRequestUnitsConsumed / bulkImportResponse.NumberOfDocumentsImported, 2)));
if (badDocumentList != null && badDocumentList.Count > 0)
{
logger.LogInfo($"bad Documents detected {badDocumentList.Count}");
}
logger.LogInfo("---------------------------------------------------------------------\n ");
#endregion
return bulkImportResponse;
}