in CosmosClone/CosmosCloneCommon/Migrator/DataScrubMigrator.cs [91:146]
public async Task ReadUploadInBatches(IDocumentQuery<string> query, List<ScrubRule> scrubRules)
{
#region batchVariables
//initialize Batch Process variables
int batchCount = 0;
TotalRecordsRetrieved = 0;
TotalRecordsScrubbed = 0;
var badEntities = new List<Object>();
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
var objScrubber = new ObjectScrubber();
#endregion
while (query.HasMoreResults)
{
batchCount++;
logger.LogInfo($"BatchNumber : {batchCount} begins ");
List<string> entities = await GetCommonStringEntitiesinBatch(query);
TotalRecordsRetrieved += entities.Count();
BulkImportResponse uploadResponse = new BulkImportResponse();
var scrubbedEntities = entities;
if (entities.Any())
{
var jEntities = new List<JToken>();
foreach (var scrubRule in scrubRules)
{
jEntities = objScrubber.ScrubObjectList(scrubbedEntities, scrubRule);
var nentities = new List<string>();
foreach (var jobj in jEntities)
{
nentities.Add(JsonConvert.SerializeObject(jobj));
}
scrubbedEntities = nentities;
scrubRule.RecordsUpdated += jEntities.Count;
}
var objEntities = jEntities.Cast<Object>().ToList();
try
{
uploadResponse = await cosmosBulkImporter.BulkSendToNewCollection<dynamic>(objEntities);
}
catch(Exception ex)
{
logger.LogError(ex);
throw;
}
}
badEntities = uploadResponse.BadInputDocuments;
TotalRecordsScrubbed += uploadResponse.NumberOfDocumentsImported;
logger.LogInfo($"Summary of Batch {batchCount} records retrieved {entities.Count()}. Records Uploaded: {uploadResponse.NumberOfDocumentsImported}");
logger.LogInfo($"Total records retrieved {TotalRecordsRetrieved}. Total records uploaded {TotalRecordsScrubbed}");
logger.LogInfo($"Time elapsed : {stopwatch.Elapsed} ");
}
stopwatch.Stop();
logger.LogInfo("Document Scrubbing completed");
}