protected override void FirstPassProducerRun()

in AdlsDotNetSDK/FileTransfer/FileDownloader.cs [191:274]


        protected override void FirstPassProducerRun()
        {
            do
            {
                if (CancelToken.IsCancellationRequested)
                {
                    return;
                }
                var der = DownloaderProducerQueue.Poll();
                if (der == null) //Means end of Producer
                {
                    DownloaderProducerQueue.Add(null); //Notify if any other threads are waiting
                    return;
                }
                try
                {
                    long numDirs = 0, numFiles = 0, totChunks = 0, unchunkedFiles = 0, totSize = 0, isEmpty =0;
                    var fop = Client.EnumerateDirectory(der.FullName);
                    foreach (var dir in fop)
                    {
                        isEmpty = 1;
                        if (dir.Type == DirectoryEntryType.DIRECTORY)
                        {
                            if (NotRecurse)//Directly add the directories to be created since we won't go in recursively
                            {
                                if (!AddDirectoryToConsumerQueue(dir.FullName, false))
                                {
                                    continue;
                                }
                            }
                            else
                            {
                                DownloaderProducerQueue.Add(dir);
                            }
                            numDirs++;
                        }
                        else
                        {
                            if (RecordedMetadata.EntryTransferredSuccessfulLastTime(dir.FullName))
                            {
                                continue;
                            }
                            // We calculate the total files here only even though some files are chunked or non-chunked in the final producer pass
                            numFiles++;
                            long fileSizeToTransfer = 0;
                            // If we are resuming and last time we chunked this file and it is incomplete so we want to chunk it this time also
                            if (RecordedMetadata.EntryTransferredIncompleteLastTime(dir.FullName))
                            {
                                long chunks = AddFileToConsumerQueue(dir.FullName, dir.Length, true, out fileSizeToTransfer);
                                totChunks += chunks;
                            }
                            // If the length is less than skip chunking weight threshold then we will add them directly to job queue as non-chunked jobs
                            else if (dir.Length <= SkipChunkingWeightThreshold)
                            {
                                AddFileToConsumerQueue(dir.FullName, dir.Length, false, out fileSizeToTransfer);
                                unchunkedFiles++;
                            }// We will only update the totSize based on number of chunks or unchunked files that will get transfered this turn
                            else // We are not sure, so we will store them in internal list
                            {
                                if (dir.Length > ChunkWeightThreshold)
                                {
                                    Interlocked.Increment(ref _numLargeFiles);
                                }
                                AddDirectoryEntryToList(dir);
                            }
                            totSize += fileSizeToTransfer;
                        }
                    }
                    bool isDirectoryEmptyAndNotDownloadedYet = false;
                    if (isEmpty == 0)
                    {
                        isDirectoryEmptyAndNotDownloadedYet= AddDirectoryToConsumerQueue(der.FullName, false);
                        
                    }
                    // If there are any sub directories and it is not recurse update the number of directories
                    StatusUpdate(numFiles, unchunkedFiles, totChunks, totSize, NotRecurse ? numDirs : (isDirectoryEmptyAndNotDownloadedYet ? 1 : 0));
                }
                catch (AdlsException ex)
                {
                    Status.EntriesFailed.Add(new SingleEntryTransferStatus(der.FullName, null, ex.Message,
                        EntryType.Directory, SingleChunkStatus.Failed));
                }
            } while (!NotRecurse);
        }