public async Task ReadUploadInBatches()

in CosmosClone/CosmosCloneCommon/Migrator/DataScrubMigrator.cs [91:146]


        public async Task ReadUploadInBatches(IDocumentQuery<string> query, List<ScrubRule> scrubRules)
        {
            #region batchVariables
            //initialize Batch Process variables                    
            int batchCount = 0;
            TotalRecordsRetrieved = 0;
            TotalRecordsScrubbed = 0;
            var badEntities = new List<Object>();
            Stopwatch stopwatch = new Stopwatch();
            stopwatch.Start();
            var objScrubber = new ObjectScrubber();
            #endregion
            while (query.HasMoreResults)
            {
                batchCount++;
                logger.LogInfo($"BatchNumber : {batchCount} begins ");
                List<string> entities = await GetCommonStringEntitiesinBatch(query);
                TotalRecordsRetrieved += entities.Count();
                BulkImportResponse uploadResponse = new BulkImportResponse();
                var scrubbedEntities = entities;
                if (entities.Any())
                {
                    var jEntities = new List<JToken>();
                    foreach (var scrubRule in scrubRules)
                    {
                        jEntities = objScrubber.ScrubObjectList(scrubbedEntities, scrubRule);
                        var nentities = new List<string>();
                        foreach (var jobj in jEntities)
                        {
                            nentities.Add(JsonConvert.SerializeObject(jobj));
                        }
                        scrubbedEntities = nentities;
                        scrubRule.RecordsUpdated += jEntities.Count;
                    }                       
                    var objEntities = jEntities.Cast<Object>().ToList();
                    try
                    {
                        uploadResponse = await cosmosBulkImporter.BulkSendToNewCollection<dynamic>(objEntities);
                    }
                    catch(Exception ex)
                    {
                        logger.LogError(ex);
                        throw;                        
                    }                                    
                }
                badEntities = uploadResponse.BadInputDocuments;
                TotalRecordsScrubbed += uploadResponse.NumberOfDocumentsImported;

                logger.LogInfo($"Summary of Batch {batchCount} records retrieved {entities.Count()}. Records Uploaded: {uploadResponse.NumberOfDocumentsImported}");
                logger.LogInfo($"Total records retrieved {TotalRecordsRetrieved}. Total records uploaded {TotalRecordsScrubbed}");
                logger.LogInfo($"Time elapsed : {stopwatch.Elapsed} ");
            }

            stopwatch.Stop();
            logger.LogInfo("Document Scrubbing completed");
        }