public async Task ReadUploadInBatches()

in CosmosClone/CosmosCloneCommon/Migrator/DocumentMigrator.cs [165:232]


        public async Task ReadUploadInBatches(IDocumentQuery<dynamic> query) 
        {

            #region batchVariables
            //initialize Batch Process variables                    
            int batchCount = 0;
            TotalRecordsRetrieved = 0;
            TotalRecordsSent = 0;
            var badEntities = new List<Object>();
            Stopwatch stopwatch = new Stopwatch();
            stopwatch.Start();
            #endregion
            while (query.HasMoreResults)
            {
                batchCount++;
                logger.LogInfo($"BatchNumber : {batchCount} begins ");
                List<dynamic> entities = await GetCommonEntitiesinBatch(query);
                TotalRecordsRetrieved += entities.Count();
                List<object> objEntities = new List<object>();
                objEntities.AddRange((IEnumerable<object>)entities);
                List<string> strEntities = new List<string>();
                foreach (var obj in objEntities)
                {
                    strEntities.Add(JsonConvert.SerializeObject(obj));
                }

                BulkImportResponse uploadResponse = new BulkImportResponse();
                var scrubbedEntities = strEntities;
                if (entities.Any())
                {
                    if( noFilterScrubRules == null || noFilterScrubRules.Count==0)
                    {                        
                        uploadResponse = await cosmosBulkImporter.BulkSendToNewCollection<dynamic>(entities);
                    }
                    else
                    {
                        var jEntities = new List<JToken>();                        
                        foreach (var sRule in noFilterScrubRules)
                        {
                            jEntities = objectScrubber.ScrubObjectList(scrubbedEntities, sRule);
                            var nentities = new List<string>();
                            foreach (var jobj in jEntities)
                            {
                                nentities.Add(JsonConvert.SerializeObject(jobj));
                            }
                            scrubbedEntities = nentities;
                            sRule.RecordsUpdated += jEntities.Count;
                        }
                        var objDocuments = jEntities.Cast<Object>().ToList();
                        uploadResponse = await cosmosBulkImporter.BulkSendToNewCollection<dynamic>(objDocuments);
                    }                    
                }
                else
                {
                    logger.LogInfo("No Entities retrieved from query");
                    continue;
                }
                badEntities = uploadResponse.BadInputDocuments;
                TotalRecordsSent += uploadResponse.NumberOfDocumentsImported;

                logger.LogInfo($"Summary of Batch {batchCount} records retrieved {entities.Count()}. Records Uploaded: {uploadResponse.NumberOfDocumentsImported}");
                logger.LogInfo($"Total records retrieved {TotalRecordsRetrieved}. Total records uploaded {TotalRecordsSent}");
                logger.LogInfo($"Time elapsed : {stopwatch.Elapsed} ");
            }
            SetCompleteOnNoFilterRules();
            stopwatch.Stop();
            logger.LogInfo("Document Migration completed");
        }