in CosmosClone/CosmosCloneCommon/Migrator/DocumentMigrator.cs [165:232]
public async Task ReadUploadInBatches(IDocumentQuery<dynamic> query)
{
#region batchVariables
//initialize Batch Process variables
int batchCount = 0;
TotalRecordsRetrieved = 0;
TotalRecordsSent = 0;
var badEntities = new List<Object>();
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
#endregion
while (query.HasMoreResults)
{
batchCount++;
logger.LogInfo($"BatchNumber : {batchCount} begins ");
List<dynamic> entities = await GetCommonEntitiesinBatch(query);
TotalRecordsRetrieved += entities.Count();
List<object> objEntities = new List<object>();
objEntities.AddRange((IEnumerable<object>)entities);
List<string> strEntities = new List<string>();
foreach (var obj in objEntities)
{
strEntities.Add(JsonConvert.SerializeObject(obj));
}
BulkImportResponse uploadResponse = new BulkImportResponse();
var scrubbedEntities = strEntities;
if (entities.Any())
{
if( noFilterScrubRules == null || noFilterScrubRules.Count==0)
{
uploadResponse = await cosmosBulkImporter.BulkSendToNewCollection<dynamic>(entities);
}
else
{
var jEntities = new List<JToken>();
foreach (var sRule in noFilterScrubRules)
{
jEntities = objectScrubber.ScrubObjectList(scrubbedEntities, sRule);
var nentities = new List<string>();
foreach (var jobj in jEntities)
{
nentities.Add(JsonConvert.SerializeObject(jobj));
}
scrubbedEntities = nentities;
sRule.RecordsUpdated += jEntities.Count;
}
var objDocuments = jEntities.Cast<Object>().ToList();
uploadResponse = await cosmosBulkImporter.BulkSendToNewCollection<dynamic>(objDocuments);
}
}
else
{
logger.LogInfo("No Entities retrieved from query");
continue;
}
badEntities = uploadResponse.BadInputDocuments;
TotalRecordsSent += uploadResponse.NumberOfDocumentsImported;
logger.LogInfo($"Summary of Batch {batchCount} records retrieved {entities.Count()}. Records Uploaded: {uploadResponse.NumberOfDocumentsImported}");
logger.LogInfo($"Total records retrieved {TotalRecordsRetrieved}. Total records uploaded {TotalRecordsSent}");
logger.LogInfo($"Time elapsed : {stopwatch.Elapsed} ");
}
SetCompleteOnNoFilterRules();
stopwatch.Stop();
logger.LogInfo("Document Migration completed");
}