AdlsDotNetSDK/FileTransfer/FileMetaData.cs (204 lines of code) (raw):

using System; using System.IO; using System.Text; using Microsoft.Win32.SafeHandles; using System.Runtime.InteropServices; using System.Threading; using System.ComponentModel; namespace Microsoft.Azure.DataLake.Store.FileTransfer { /// <summary> /// It contains the metadata of the file: Source path, Destination path, ChunkSegmentFolder, Total file length /// </summary> internal class FileMetaData { /// <summary> /// For uploader: /// 1)when files are chunked this is the directory name where chunks are saved. /// 2)when files are not chunked this is null /// For downloader: /// 1) When files are chunked temporary GUID name /// 2) when not chunked same as uploader /// </summary> internal string ChunkSegmentFolder { get; } /// <summary> /// Source full path /// </summary> internal string SrcFile { get; } /// <summary> /// Destination Path /// </summary> internal string Dest { get; } /// <summary> /// Total size of the file /// </summary> internal long TotSize { get; } /// <summary> /// Total number of chunks of the file /// </summary> internal readonly long TotalChunks; /// <summary> /// Number of chunks transferred. When all the chunks are transferred then concat job is queued /// </summary> private long _numChunksTransfered; // Number of chunks that are transfered already internal readonly long StartChunksAlreadyTransfered; /// <summary> /// Flag that stores wether file exists. It is null when no information is known. /// Once it is not null, we do not need to make FileSystem calls to know wehther the file exists /// </summary> private bool? _fileExists; /// <summary> /// Download specific flag. For download if it is true, then the file is already created on local file system. If it is false then file needs to be created. /// This is necessary because when we do chunked downloads more than one thread writes at different offsets to same file locally /// </summary> private bool _downloadTempFileExists; private readonly Object _lock = new Object(); internal FileTransferCommon Transfer { get; } /// <summary> /// Whether this is upload or download /// </summary> internal bool IsUpload { get; } /// <summary> /// If true for download we do not want to write file to local filesystem, for upload we want to read from a random stream /// </summary> internal bool IngressOrEgressTest { get; } /// <summary> /// Download specific. The buffer size of /// </summary> internal long? EgressBufferSize { get; } internal Encoding EncodeType; internal bool IsBinary; // This will be true if a chunked file is being resumed after it is incomplete last time internal bool IsFileHalfDone; private bool? _shouldFileBeResumed; // This is irrecoverable error encountered during resume internal string UnexpectedTransferErrorResume; internal string GetChunkFileName(int index) { return ChunkSegmentFolder != null ? (IsUpload ? ChunkSegmentFolder + "/" + index : ChunkSegmentFolder) : Dest; } internal FileMetaData(string src, string chunkSegmentFolder, string dest, long totSize, FileTransferCommon transfer, long totChunks, long startChunkIndx, bool ingressOrEgressTest = false, long? egressBuffer = null) : this(src, chunkSegmentFolder, dest, totSize, transfer, totChunks, false, startChunkIndx, ingressOrEgressTest) { EgressBufferSize = egressBuffer; } internal FileMetaData(string src, string chunkSegmentFolder, string dest, long totSize, FileTransferCommon transfer, long totChunks, bool isBinary, Encoding encodeType, long startChunkIndx, bool ingressOrEgressTest = false) : this(src, chunkSegmentFolder, dest, totSize, transfer, totChunks, true, startChunkIndx, ingressOrEgressTest) { IsBinary = isBinary; EncodeType = encodeType; } private FileMetaData(string src, string chunkSegmentFolder, string dest, long totSize, FileTransferCommon transfer, long totChunks, bool isUpload, long startChunkIndx, bool ingressOrEgressTest) { ChunkSegmentFolder = chunkSegmentFolder; Dest = dest; Transfer = transfer; TotSize = totSize; SrcFile = src; TotalChunks = totChunks; IsUpload = isUpload; IngressOrEgressTest = ingressOrEgressTest; // startChunkIndx = -1 means this file was not attempted before so effectively 0 chunks were done, if it is >=0 then it was attempted before if (startChunkIndx >= 0) { IsFileHalfDone = true; } _numChunksTransfered = StartChunksAlreadyTransfered = startChunkIndx < 0 ? 0 : startChunkIndx; } /// If overwrite then no need to skip. If not overwrite and the file exists then skip. This method is only necessary for chunked file transfers because: /// For uploader we do not want to create the temorary 240MB chunks if the file exists and user wants to IfExists.Fail For downloader we do not want different threads /// to write to the temp file if the destination file exists and user wants IfExists.Fail. internal bool ShouldSkipForChunkedFile(AdlsClient client) { if (Transfer.DoOverwrite == IfExists.Overwrite) { return false; } lock (_lock) { //This is to prevent unecessary server requests of directory access for the same file if (_fileExists != null) { return _fileExists.Value; } _fileExists = IsUpload ? client.CheckExists(Dest) : File.Exists(Dest); return _fileExists.Value; } } // https://msdn.microsoft.com/en-us/library/windows/desktop/aa364596(v=vs.85).aspx // Pinvoke to set the file as parse [DllImport("Kernel32.dll", SetLastError = true, CharSet = CharSet.Auto)] private static extern bool DeviceIoControl( SafeFileHandle hDevice, int dwIoControlCode, IntPtr inBuffer, int nInBufferSize, IntPtr outBuffer, int nOutBufferSize, ref int pBytesReturned, [In] ref NativeOverlapped lpOverlapped); // Marks the file sparse when we create the local file during download. Otherwise if we have threads writing to the same file at different offset we have backfilling of zeros // meaning we are writing the file twicedue to which we get half the performance internal void MarkFileSparse(SafeFileHandle fileHandle) { int bytesReturned = 0; NativeOverlapped lpOverlapped = new NativeOverlapped(); DeviceIoControl(fileHandle, 590020, //FSCTL_SET_SPARSE, IntPtr.Zero, 0, IntPtr.Zero, 0, ref bytesReturned, ref lpOverlapped); // Lets not fail if we are not able to mark a file sparse. // This is a perf improvement. } // For downloader. For chunked downloads: Creates the file if it does not exist. Else returns a write stream. This is necessary since multiple threads will write // to the same file at different offset. First thread to get the lock will create the file. Rest threads will open the file for read. For non chunked downloads: // Creates the directory and the file internal Stream CreateOrOpenDownloadFile() { string dest = GetChunkFileName(0); // Non chunked file download if (TotalChunks == 0) { Utils.CreateParentDirectory(dest); return new FileStream(dest, Transfer.DoOverwrite == IfExists.Overwrite ? FileMode.Create : FileMode.CreateNew, FileAccess.Write); } lock (_lock) { // If the file was reported (in the log file) to be attempted last download, then the destination must exist, unless it was explicitly deleted by user if (IsFileHalfDone || _downloadTempFileExists) { return new FileStream(dest, FileMode.Open, FileAccess.Write, FileShare.ReadWrite); } Utils.CreateParentDirectory(dest); var fs = new FileStream(dest, FileMode.Create, FileAccess.Write, FileShare.ReadWrite); if(RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) { MarkFileSparse(fs.SafeFileHandle); } _downloadTempFileExists = true; return fs; } } // Updates the number of chunks uploaded or downloaded. Once all the chunks are done, then add the concat job internal void UpdateChunk() { if (!IsUpload && IngressOrEgressTest) { return; } lock (_lock) { if (_numChunksTransfered == StartChunksAlreadyTransfered) // First chunk has been transferred so put a record in the metadata log file { Transfer.AddBeginRecord(SrcFile, ChunkSegmentFolder); } _numChunksTransfered++; if (_numChunksTransfered >= TotalChunks) { //Add concatenatejob to priorityqueue of transfer Transfer.AddConcatJobToQueue(SrcFile, ChunkSegmentFolder, Dest, TotSize, TotalChunks); } } } internal bool ResumeUpload(AdlsClient client) { lock (_lock) { if (_shouldFileBeResumed != null) { return _shouldFileBeResumed.Value; } bool chunkTempDestExists = client.CheckExists(ChunkSegmentFolder); bool chunkConcatTempDestExists = client.CheckExists(ChunkSegmentFolder + FileUploader.DestTempGuidForConcat); _shouldFileBeResumed = false; // Chunks of a file will be resumed only if the chunksegment folder exists and the temp destination guid does not exist and all chunks were not reported to be done if (chunkTempDestExists) { if (chunkConcatTempDestExists) { // BOOM throw exception because both cant exist- probably problem with concat UnexpectedTransferErrorResume = $"{SrcFile}: Irrecoverable error, Concat is in an intermediate state. Please trasnfer this file without resume flag."; } else if (StartChunksAlreadyTransfered == TotalChunks) { // Add full concat job and you are done Transfer.AddConcatJobToQueue(SrcFile, ChunkSegmentFolder, Dest, TotSize, TotalChunks); } else { // Continue with the job _shouldFileBeResumed = true; } } else if (chunkConcatTempDestExists) { // We only need to add rename part of the concat job Transfer.AddConcatJobToQueue(SrcFile, ChunkSegmentFolder, Dest, TotSize, TotalChunks, true); } else if (!client.CheckExists(Dest)) { // At this stage dest has to exist // Again something really wrong happened- probably the rename failed even after retries with some intermediate state UnexpectedTransferErrorResume = $"{SrcFile}: Irrecoverable error, Rename is in an intermediate state. Please trasnfer this file without resume flag."; } else { // All chunks are transferred and the rename happened successfully, put in record so that we do not have to do these checks again for the file Transfer.AddCompleteRecord(SrcFile,true); } return _shouldFileBeResumed.Value; } } internal bool ResumeDownload() { lock (_lock) { if (_shouldFileBeResumed != null) { return _shouldFileBeResumed.Value; } bool chunkTempDestExists = File.Exists(ChunkSegmentFolder); _shouldFileBeResumed = false; if (chunkTempDestExists) { if (StartChunksAlreadyTransfered == TotalChunks) { // Add concat job Transfer.AddConcatJobToQueue(SrcFile, ChunkSegmentFolder, Dest, TotSize, TotalChunks); } else { // Continue job _shouldFileBeResumed = true; } } else if (!File.Exists(Dest)) { // At this stage dest has to exist // Again something really wrong happened- probably the rename failed even after retries with some intermediate state UnexpectedTransferErrorResume = $"{SrcFile}: Irrecoverable error, Rename is in an intermediate state. Please trasnfer this file without resume flag."; } else { // All chunks are transferred and the rename happened successfully, put in record so that we do not have to do these checks again Transfer.AddCompleteRecord(SrcFile, true); } return _shouldFileBeResumed.Value; } } } }