AdlsDotNetSDK/FileTransfer/FileUploader.cs (172 lines of code) (raw):
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Text;
using System.Threading;
using Microsoft.Azure.DataLake.Store.QueueTools;
using Microsoft.Azure.DataLake.Store.FileTransfer.Jobs;
namespace Microsoft.Azure.DataLake.Store.FileTransfer
{
/// <summary>
/// Class that immplements specific logic for Uploader
/// </summary>
internal sealed class FileUploader : FileTransferCommon
{
/// <summary>
/// FIFO queue containing directories for producer queue in case of Uploader
/// </summary>
private QueueWrapper<DirectoryInfo> UploaderProducerQueue { get; }
private const int NumProducerThreadsFirstPass = 1;
private readonly bool _isBinary;
private readonly Encoding _encodeType;
internal const string DestTempGuidForConcat = "ConcatGuid";
private FileUploader(string srcPath, string destPath, AdlsClient client, int numThreads,
IfExists doOverwrite, IProgress<TransferStatus> progressTracker, bool notRecurse, bool disableTransferLogging, bool resume, bool isBinary, CancellationToken cancelToken, bool ingressTest, long chunkSize) : base(srcPath, destPath, client, numThreads, doOverwrite, progressTracker, notRecurse, disableTransferLogging, resume, ingressTest, chunkSize, Path.Combine(Path.GetTempPath(), ".adl", "Upload", GetTransferLogFileName(client.AccountFQDN, srcPath, destPath,Path.DirectorySeparatorChar,'/')), cancelToken, $"binary:{isBinary}")
{
// If not recurse then we will have one thread and ProducerFirstPass logic loop will run only once
NumProducerThreads = NotRecurse ? 1 : NumProducerThreadsFirstPass;
UploaderProducerQueue = new QueueWrapper<DirectoryInfo>(NumProducerThreads);
if (FileTransferLog.IsDebugEnabled)
{
FileTransferLog.Debug($"FileTransfer.Uploader, Src: {SourcePath}, Dest: {DestPath}, Threads: {NumConsumerThreads}, TrackingProgress: {ProgressTracker != null}, OverwriteIfExist: {DoOverwrite == IfExists.Overwrite}");
}
_isBinary = isBinary;
_encodeType = Encoding.UTF8;
}
/// Replaces the local directory separator in the input path by the directory separator for remote file system
protected override string GetDestDirectoryFormatted(string ipPath)
{
return ipPath.Replace(Path.DirectorySeparatorChar, GetDestDirectorySeparator());
}
/// <summary>
/// Gets the directory separator for remote file system-ADLS
/// </summary>
/// <returns></returns>
protected override char GetDestDirectorySeparator()
{
return '/';
}
/// <summary>
/// Upload directory or file from local to remote
/// </summary>
/// <param name="srcPath">Local source path</param>
/// <param name="destPath">Remote destination path</param>
/// <param name="client">ADLS client</param>
/// <param name="numThreads">Number of threads- if not passed will take default number of threads</param>
/// <param name="shouldOverwrite">Whether to overwrite or skip if the destination </param>
/// <param name="progressTracker">Progresstracker to track progress of file transfer</param>
/// <param name="notRecurse"> If true then just does a enumeration in first level</param>
/// <param name="disableTransferLogging"></param>
/// <param name="resume">If true we are resuming a previously interrupted upload process</param>
/// <param name="isBinary">If false then we want to upload at new line boundaries</param>
/// <param name="cancelToken">Cancellation Token</param>
/// <param name="ingressTest">True if we just want to test ingress</param>
/// <param name="chunkSize">Chunk Size used for chunking</param>
/// <returns>Transfer Status of the upload</returns>
internal static TransferStatus Upload(string srcPath, string destPath, AdlsClient client, int numThreads = -1, IfExists shouldOverwrite = IfExists.Overwrite, IProgress<TransferStatus> progressTracker = null, bool notRecurse = false, bool disableTransferLogging = false, bool resume = false, bool isBinary = false, CancellationToken cancelToken = default(CancellationToken), bool ingressTest = false, long chunkSize = ChunkSizeDefault)
{
if (string.IsNullOrWhiteSpace(destPath))
{
throw new ArgumentException(nameof(DestPath));
}
if (srcPath.EndsWith($"{Path.DirectorySeparatorChar}"))
{
srcPath = srcPath.Substring(0, srcPath.Length - 1);
}
// Check the destination path is not root
if (destPath.EndsWith("/") && !destPath.Equals("/"))
{
destPath = destPath.Substring(0, destPath.Length - 1);
}
var uploader= new FileUploader(srcPath, destPath, client, numThreads, shouldOverwrite, progressTracker, notRecurse, disableTransferLogging, resume, isBinary, cancelToken, ingressTest, chunkSize);
return uploader.RunTransfer();
}
/// Verifies whether input is a directory or a file. If it is a file then there is no need to start the producer
protected override bool StartEnumeration()
{
if (File.Exists(SourcePath))
{
var file = new FileInfo(SourcePath);
// Make sure if sourcepath is relative then convert it to Absolute path
SourcePath = file.FullName;
long chunks = AddFileToConsumerQueue(file.FullName, file.Length, file.Length > ChunkSize, out long fileSizeToTransfer);
StatusUpdate(1, file.Length <= ChunkSize ? 1 : 0, chunks, fileSizeToTransfer, 0);
return false;
}
//Check if the destination is file
DirectoryEntry entry = null;
try
{
entry = Client.GetDirectoryEntry(DestPath);
}
catch (AdlsException e)
{
if (e.HttpStatus != HttpStatusCode.NotFound)
{
throw e;
}
}
if (entry?.Type == DirectoryEntryType.FILE)
{
throw new IOException("The destination path is an existing file. It should be a directory");
}
if (Directory.Exists(SourcePath))
{
DirectoryInfo dir = new DirectoryInfo(SourcePath);
// Make sure if sourcepath is relative then convert it to Absolute path
SourcePath = dir.FullName;
UploaderProducerQueue.Add(dir);
return true;
}
throw new FileNotFoundException(SourcePath);
}
/// <summary>
/// Adds the concat jobs for uploader
/// </summary>
/// <param name="source">Source file path</param>
/// <param name="chunkSegmentFolder">Temporary folder where chunks are located</param>
/// <param name="dest">Destination file</param>
/// <param name="totSize">Total size of the file- needed for verification of the copy</param>
/// <param name="totalChunks">Total number of chunks</param>
/// <param name="doUploadRenameOnly"></param>
internal override void AddConcatJobToQueue(string source, string chunkSegmentFolder, string dest, long totSize,
long totalChunks, bool doUploadRenameOnly =false)
{
ConsumerQueue.Add(new ConcatenateJob(source, chunkSegmentFolder, dest, Client, totSize, totalChunks, true, doUploadRenameOnly));
}
/// <summary>
/// Producer code which traverses local directory tree and add them as chunked or non-chunked jobs to job queue depending on it's size. Currently this directly adds
/// jobs to job queue but in future we will try to add files to an internal list and add them as jobs in FinalPassProducerRun.
/// </summary>
protected override void FirstPassProducerRun()
{
do
{
if (CancelToken.IsCancellationRequested)
{
return;
}
var dir = UploaderProducerQueue.Poll();
if (dir == null) //Means end of Producer
{
UploaderProducerQueue.Add(null); //Notify if any other threads are waiting
return;
}
try
{
long numSubDirs = 0, isEmpty=0;
IEnumerable<DirectoryInfo> enumDir = dir.EnumerateDirectories();
foreach (var subDir in enumDir)
{
isEmpty = 1;
if (NotRecurse) //Directly add the directories to be created since we won't go in recursively
{
if (!AddDirectoryToConsumerQueue(subDir.FullName, true))
{
continue;
}
}
else
{
UploaderProducerQueue.Add(subDir);
}
numSubDirs++;
}
IEnumerable<FileInfo> enumFiles = dir.EnumerateFiles();
long numFiles = 0, totChunks = 0, unchunkedFiles = 0, totSize = 0;
foreach (var file in enumFiles)
{
isEmpty = 1;
if (RecordedMetadata.EntryTransferredSuccessfulLastTime(file.FullName))
{
continue;
}
long fileSizeToTransfer;
long chunks = AddFileToConsumerQueue(file.FullName, file.Length, file.Length > ChunkSize, out fileSizeToTransfer);
totChunks += chunks;
if (file.Length <= ChunkSize)
{
unchunkedFiles++;
}
numFiles++;
totSize += fileSizeToTransfer;
}
bool isDirectoryEmptyAndNotUploadedYet = false;
if (isEmpty == 0)
{
isDirectoryEmptyAndNotUploadedYet = AddDirectoryToConsumerQueue(dir.FullName, true);
}
// If there are any directories and it is not recurse update the number of directories
StatusUpdate(numFiles, unchunkedFiles, totChunks, totSize, NotRecurse ? numSubDirs : (isDirectoryEmptyAndNotUploadedYet ? 1 : 0));
}
catch (Exception ex)
{
Status.EntriesFailed.Add(new SingleEntryTransferStatus(dir.FullName, null, ex.StackTrace,
EntryType.Directory, SingleChunkStatus.Failed));
}
} while (!NotRecurse);
}
/// Creates the MetaData for uploader, alreadyChunksTransferred will be greater than -1 only if a chunked file is being resumed after it is incomplete last time
internal override FileMetaData AssignMetaData(string fileFullName, string chunkSegmentFolder, string destPath,
long fileLength, long numChunks, long alreadyChunksTransfered = -1)
{
return new FileMetaData(fileFullName, chunkSegmentFolder, destPath, fileLength, this, numChunks, _isBinary, _encodeType, alreadyChunksTransfered);
}
/// <summary>
/// Currently this is not immplemented. We do a constant chunking size. But for future we would like to increase chunking size for
/// very large files so do some kind of adaptive chunking.
/// </summary>
protected override void FinalPassProducerRun()
{
return;
}
}
}