AdlsDotNetSDK/FileTransfer/FileDownloader.cs (216 lines of code) (raw):
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using Microsoft.Azure.DataLake.Store.FileTransfer.Jobs;
using Microsoft.Azure.DataLake.Store.QueueTools;
namespace Microsoft.Azure.DataLake.Store.FileTransfer
{
/// <summary>
/// Class that immplements specific logic for Downloader
/// </summary>
internal sealed class FileDownloader : FileTransferCommon
{
/// <summary>
/// Threshold to determine this is a large file for which we may need chunking for download
/// </summary>
internal static long ChunkWeightThreshold = 5 * 1024 * 1024 * 1024L;
/// <summary>
/// If number of large files is less than this number then only we should chunk large files. Say we have 100 large files then we do not need to do chunking
/// because anyways all 100 threads will be active during the download
/// </summary>
internal static long NumLargeFileThreshold = 20;
/// <summary>
/// Files with sizes less than this limit will never be chunked
/// </summary>
internal static long SkipChunkingWeightThreshold = 1 * 1024 * 1024 * 1024L;
// Number of large files to download- we know this after enumeration
private int _numLargeFiles;
// Capacity of the internal list that stores the enumerated files temporarily
private const int DownloaderListCapacity = 10240;
// Internal list that stores the enumerated files temporarily
private List<DirectoryEntry> DownloaderList { get; }
private const int NumProducerThreadsFirstPass = 20;
internal int EgressBufferCapacity { get; }
/// <summary>
/// FIFO queue containing directories for producer
/// </summary>
private QueueWrapper<DirectoryEntry> DownloaderProducerQueue { get; }
private FileDownloader(string srcPath, string destPath, AdlsClient client, int numThreads,
IfExists doOverwrite, IProgress<TransferStatus> progressTracker, bool notRecurse, bool disableTransferLogging, bool resume, CancellationToken cancelToken, bool egressTest, int egressBufferCapacity, long chunkSize) : base(srcPath, destPath, client, numThreads, doOverwrite, progressTracker, notRecurse, disableTransferLogging, resume, egressTest, chunkSize, Path.Combine(Path.GetTempPath(), ".adl", "Download", GetTransferLogFileName(client.AccountFQDN, srcPath, destPath, '/', Path.DirectorySeparatorChar)), cancelToken)
{
EgressBufferCapacity = egressBufferCapacity;
// If not recurse then we will have one thread and ProducerFirstPass logic loop will run only once
NumProducerThreads = notRecurse ? 1 : NumProducerThreadsFirstPass;
DownloaderProducerQueue = new QueueWrapper<DirectoryEntry>(NumProducerThreads);
DownloaderList = new List<DirectoryEntry>(DownloaderListCapacity);
if (FileTransferLog.IsDebugEnabled)
{
FileTransferLog.Debug($"FileTransfer.Downloader, Src: {SourcePath}, Dest: {DestPath}, Threads: {NumConsumerThreads}, TrackingProgress: {ProgressTracker != null}, OverwriteIfExist: {DoOverwrite == IfExists.Overwrite}");
}
}
/// <summary>
/// Verifies whether input is a directory or a file. If it is a file then there is no need to start the producer
/// </summary>
/// <returns>True if we need tos tart the producer threads</returns>
protected override bool StartEnumeration()
{
DirectoryEntry dir = Client.GetDirectoryEntry(SourcePath);
if (dir.Type == DirectoryEntryType.FILE)
{
long fileSizeToTransfer;
long chunks = AddFileToConsumerQueue(dir.FullName, dir.Length, dir.Length > SkipChunkingWeightThreshold, out fileSizeToTransfer);
StatusUpdate(1, dir.Length <= SkipChunkingWeightThreshold ? 1 : 0, chunks, fileSizeToTransfer, 0);
return false;
}
if (!IngressOrEgressTest && File.Exists(DestPath))
{
throw new IOException("The destination path is an existing file. It should be a directory");
}
if (!IngressOrEgressTest)
{
Directory.CreateDirectory(DestPath);
}
DownloaderProducerQueue.Add(dir);
return true;
}
#region UnitTestMethods
// Testing purpose
internal static TransferStatus Download(string srcPath, string destPath, AdlsClient client, bool forceChunking, bool forceNotChunking, int numThreads = -1,
IProgress<TransferStatus> progressTracker = null, IfExists shouldOverwrite = IfExists.Overwrite, bool notRecurse = false, bool resume = false, bool egressTest = false, int egressBufferCapacity = 4 * 1024 * 1024, long chunkSize = ChunkSizeDefault)
{
if (forceChunking && forceNotChunking)
{
throw new ArgumentException("Both of them cant be true");
}
if (forceChunking)
{
SkipChunkingWeightThreshold = ChunkSizeDefault;
NumLargeFileThreshold = Int64.MaxValue;
}
else if (forceNotChunking)
{
SkipChunkingWeightThreshold = Int64.MaxValue;
}
return new FileDownloader(srcPath, destPath, client, numThreads, shouldOverwrite, progressTracker, notRecurse, false, resume, default(CancellationToken), egressTest, egressBufferCapacity, chunkSize).RunTransfer();
}
#endregion
/// <summary>
/// Download directory or file from remote server to local
/// </summary>
/// <param name="srcPath">Remote source path</param>
/// <param name="destPath">Local destination path</param>
/// <param name="client">ADLS client</param>
/// <param name="numThreads">Number of threads- if not passed will take default number of threads</param>
/// <param name="shouldOverwrite">Whether to overwrite or skip if the destination </param>
/// <param name="progressTracker">Progresstracker to track progress of file transfer</param>
/// <param name="notRecurse">If true then does enumeration till one level only, else will do recursive enumeration</param>
/// <param name="disableTransferLogging"></param>
/// <param name="resume">If true we are resuming a previously interrupted upload process</param>
/// <param name="cancelToken">Cancellation Token</param>
/// <param name="egressTest">Egress test when we do not write file to local file system</param>
/// <param name="egressBufferCapacity">Egress buffer size - Size of the read reuest from server</param>
/// <param name="chunkSize">Chunk Size used for chunking</param>
/// <returns>Transfer status of the download</returns>
internal static TransferStatus Download(string srcPath, string destPath, AdlsClient client, int numThreads = -1, IfExists shouldOverwrite = IfExists.Overwrite, IProgress<TransferStatus> progressTracker = null, bool notRecurse = false, bool disableTransferLogging =false, bool resume = false, CancellationToken cancelToken=default(CancellationToken), bool egressTest = false, int egressBufferCapacity = 4 * 1024 * 1024, long chunkSize = ChunkSizeDefault)
{
if (!egressTest && string.IsNullOrWhiteSpace(destPath))
{
throw new ArgumentException(nameof(DestPath));
}
if (srcPath.EndsWith("/"))
{
srcPath = srcPath.Substring(0, srcPath.Length - 1);
}
if (destPath.EndsWith($"{Path.DirectorySeparatorChar}"))
{
destPath = destPath.Substring(0, destPath.Length - 1);
}
var downloader = new FileDownloader(srcPath, destPath, client, numThreads, shouldOverwrite, progressTracker,
notRecurse, disableTransferLogging, resume, cancelToken, egressTest, egressBufferCapacity, chunkSize);
return downloader.RunTransfer();
}
/// <summary>
/// Replaces the remote directory separator in the input path by the directory separator for local file system
/// </summary>
/// <param name="relativePath">Input path</param>
/// <returns></returns>
protected override string GetDestDirectoryFormatted(string relativePath)
{
return relativePath.Replace('/', GetDestDirectorySeparator());
}
/// <summary>
/// Gets the directory separator for local file system
/// </summary>
/// <returns></returns>
protected override char GetDestDirectorySeparator()
{
return Path.DirectorySeparatorChar;
}
/// <summary>
/// Adds the concat jobs for downloader
/// </summary>
/// <param name="source">Source file path</param>
/// <param name="chunkSegmentFolder">Temporary destination file name</param>
/// <param name="dest">Destination file</param>
/// <param name="totSize">Total size of the file- needed for verification of the copy</param>
/// <param name="totalChunks">Total number of chunks</param>
/// <param name="doUploadRenameOnly"></param>
internal override void AddConcatJobToQueue(string source, string chunkSegmentFolder, string dest, long totSize,
long totalChunks, bool doUploadRenameOnly=false)
{
ConsumerQueue.Add(new ConcatenateJob(source, chunkSegmentFolder, dest, Client, totSize, totalChunks, false));
}
/// <summary>
/// Adds the directory entry to internal list
/// </summary>
/// <param name="dir">DirectoryEntry</param>
private void AddDirectoryEntryToList(DirectoryEntry dir)
{
lock (DownloaderList)
{
DownloaderList.Add(dir);
}
}
/// <summary>
/// Chunking always during Download is not a effecient choice. Because multiple threads writing to different offsets of file on local file system is very slow.
/// Chunking only makes sense when there are less number of very large files. File sizes greater than ChunkWeightThreshold is defined as large file
/// If number of files with sizes greater than ChunkWeightThreshold is less than NumLargeFileThreshold then we will do chunking. Also for files whose size is less than DefaultSkipChunkingWeightThreshold there is
/// no need of chunking. If we have large number of large files then we also can do without chunking.
/// In first pass producer threads which will traverse directory tree and store the entries in a internal list or add them as non-chunked jobs to job queue depending on it's size
/// Run on multiple threads
/// </summary>
protected override void FirstPassProducerRun()
{
do
{
if (CancelToken.IsCancellationRequested)
{
return;
}
var der = DownloaderProducerQueue.Poll();
if (der == null) //Means end of Producer
{
DownloaderProducerQueue.Add(null); //Notify if any other threads are waiting
return;
}
try
{
long numDirs = 0, numFiles = 0, totChunks = 0, unchunkedFiles = 0, totSize = 0, isEmpty =0;
var fop = Client.EnumerateDirectory(der.FullName);
foreach (var dir in fop)
{
isEmpty = 1;
if (dir.Type == DirectoryEntryType.DIRECTORY)
{
if (NotRecurse)//Directly add the directories to be created since we won't go in recursively
{
if (!AddDirectoryToConsumerQueue(dir.FullName, false))
{
continue;
}
}
else
{
DownloaderProducerQueue.Add(dir);
}
numDirs++;
}
else
{
if (RecordedMetadata.EntryTransferredSuccessfulLastTime(dir.FullName))
{
continue;
}
// We calculate the total files here only even though some files are chunked or non-chunked in the final producer pass
numFiles++;
long fileSizeToTransfer = 0;
// If we are resuming and last time we chunked this file and it is incomplete so we want to chunk it this time also
if (RecordedMetadata.EntryTransferredIncompleteLastTime(dir.FullName))
{
long chunks = AddFileToConsumerQueue(dir.FullName, dir.Length, true, out fileSizeToTransfer);
totChunks += chunks;
}
// If the length is less than skip chunking weight threshold then we will add them directly to job queue as non-chunked jobs
else if (dir.Length <= SkipChunkingWeightThreshold)
{
AddFileToConsumerQueue(dir.FullName, dir.Length, false, out fileSizeToTransfer);
unchunkedFiles++;
}// We will only update the totSize based on number of chunks or unchunked files that will get transfered this turn
else // We are not sure, so we will store them in internal list
{
if (dir.Length > ChunkWeightThreshold)
{
Interlocked.Increment(ref _numLargeFiles);
}
AddDirectoryEntryToList(dir);
}
totSize += fileSizeToTransfer;
}
}
bool isDirectoryEmptyAndNotDownloadedYet = false;
if (isEmpty == 0)
{
isDirectoryEmptyAndNotDownloadedYet= AddDirectoryToConsumerQueue(der.FullName, false);
}
// If there are any sub directories and it is not recurse update the number of directories
StatusUpdate(numFiles, unchunkedFiles, totChunks, totSize, NotRecurse ? numDirs : (isDirectoryEmptyAndNotDownloadedYet ? 1 : 0));
}
catch (AdlsException ex)
{
Status.EntriesFailed.Add(new SingleEntryTransferStatus(der.FullName, null, ex.Message,
EntryType.Directory, SingleChunkStatus.Failed));
}
} while (!NotRecurse);
}
/// <summary>
/// Run by one thread only. Traverse the internal list and add chunked or non-chunked jobs depending on the criteria
/// </summary>
protected override void FinalPassProducerRun()
{
bool isToBeChunked = _numLargeFiles < NumLargeFileThreshold;
long totChunks = 0, unchunkedFiles = 0, totSize = 0;
foreach (var dir in DownloaderList)
{
if (CancelToken.IsCancellationRequested)
{
return;
}
long fileSizeToTransfer;
long chunks = AddFileToConsumerQueue(dir.FullName, dir.Length, isToBeChunked, out fileSizeToTransfer);
totChunks += chunks;
totSize += fileSizeToTransfer;
if (!isToBeChunked)
{
unchunkedFiles++;
}
}
StatusUpdate(0, unchunkedFiles, totChunks, totSize, 0);
}
// Creates MetaData for downloader, alreadyChunksTransferred will be greater than -1 only if a chunked file is being resumed after it is incomplete last time
internal override FileMetaData AssignMetaData(string fileFullName, string chunkSegmentFolder, string destPath, long fileLength, long numChunks, long alreadyChunksTransfered = -1)
{
return new FileMetaData(fileFullName, chunkSegmentFolder, destPath, fileLength, this, numChunks, alreadyChunksTransfered, IngressOrEgressTest, EgressBufferCapacity);
}
}
}