in AdlsDotNetSDK/FileTransfer/FileDownloader.cs [191:274]
protected override void FirstPassProducerRun()
{
do
{
if (CancelToken.IsCancellationRequested)
{
return;
}
var der = DownloaderProducerQueue.Poll();
if (der == null) //Means end of Producer
{
DownloaderProducerQueue.Add(null); //Notify if any other threads are waiting
return;
}
try
{
long numDirs = 0, numFiles = 0, totChunks = 0, unchunkedFiles = 0, totSize = 0, isEmpty =0;
var fop = Client.EnumerateDirectory(der.FullName);
foreach (var dir in fop)
{
isEmpty = 1;
if (dir.Type == DirectoryEntryType.DIRECTORY)
{
if (NotRecurse)//Directly add the directories to be created since we won't go in recursively
{
if (!AddDirectoryToConsumerQueue(dir.FullName, false))
{
continue;
}
}
else
{
DownloaderProducerQueue.Add(dir);
}
numDirs++;
}
else
{
if (RecordedMetadata.EntryTransferredSuccessfulLastTime(dir.FullName))
{
continue;
}
// We calculate the total files here only even though some files are chunked or non-chunked in the final producer pass
numFiles++;
long fileSizeToTransfer = 0;
// If we are resuming and last time we chunked this file and it is incomplete so we want to chunk it this time also
if (RecordedMetadata.EntryTransferredIncompleteLastTime(dir.FullName))
{
long chunks = AddFileToConsumerQueue(dir.FullName, dir.Length, true, out fileSizeToTransfer);
totChunks += chunks;
}
// If the length is less than skip chunking weight threshold then we will add them directly to job queue as non-chunked jobs
else if (dir.Length <= SkipChunkingWeightThreshold)
{
AddFileToConsumerQueue(dir.FullName, dir.Length, false, out fileSizeToTransfer);
unchunkedFiles++;
}// We will only update the totSize based on number of chunks or unchunked files that will get transfered this turn
else // We are not sure, so we will store them in internal list
{
if (dir.Length > ChunkWeightThreshold)
{
Interlocked.Increment(ref _numLargeFiles);
}
AddDirectoryEntryToList(dir);
}
totSize += fileSizeToTransfer;
}
}
bool isDirectoryEmptyAndNotDownloadedYet = false;
if (isEmpty == 0)
{
isDirectoryEmptyAndNotDownloadedYet= AddDirectoryToConsumerQueue(der.FullName, false);
}
// If there are any sub directories and it is not recurse update the number of directories
StatusUpdate(numFiles, unchunkedFiles, totChunks, totSize, NotRecurse ? numDirs : (isDirectoryEmptyAndNotDownloadedYet ? 1 : 0));
}
catch (AdlsException ex)
{
Status.EntriesFailed.Add(new SingleEntryTransferStatus(der.FullName, null, ex.Message,
EntryType.Directory, SingleChunkStatus.Failed));
}
} while (!NotRecurse);
}