AdlsDotNetSDK/FileTransfer/Jobs/ConcatenateJob.cs (157 lines of code) (raw):
using System.Collections.Generic;
using System;
using System.IO;
using System.Net;
using Microsoft.Azure.DataLake.Store.RetryPolicies;
namespace Microsoft.Azure.DataLake.Store.FileTransfer.Jobs
{
/// <summary>
/// For uploader concatenates the chunks and for downloader renames the temporary filename.
/// </summary>
internal class ConcatenateJob : BaseJob
{
private string Source { get; }
/// <summary>
/// For uploader:
/// 1)when files are chunked this is the directory name where chunks are saved.
/// For downloader:
/// 1) When files are chunked, temporary GUID name
/// When not chunked, Concat job is not created
/// </summary>
private string ChunkSegmentFolder { get; }
/// <summary>
/// Destination path
/// </summary>
private string Destination { get; }
/// <summary>
/// Adls client
/// </summary>
private AdlsClient Client { get; }
/// <summary>
/// Total file size- use for uploader or downloader verification
/// </summary>
private long FileSize { get; }
/// <summary>
/// Whether it is upload or download
/// </summary>
private bool IsUpload { get; }
/// <summary>
/// Total number of chunks
/// </summary>
private readonly long _totalChunks;
private readonly bool _performUploadRenameOnly;
private const int UploadRetryTime = 1;
internal ConcatenateJob(string source, string chunkSegmentFolder, string dest, AdlsClient client, long size, long totalChunks, bool isUpload, bool doUploadRenameOnly = false) : base(size)
{
Source = source;
ChunkSegmentFolder = chunkSegmentFolder;
Destination = dest;
Client = client;
FileSize = size;
_totalChunks = totalChunks;
IsUpload = isUpload;
_performUploadRenameOnly = doUploadRenameOnly;
}
protected override object DoJob()
{
try
{
//Note: We will never reach here if the destination exists and we have elected to not overwrite the destination if it exists
if (IsUpload)
{
return PerformUploadJob();
}
// DOWNLOAD CASE
//Deletes the destination and renames the file from a temporaray guid name to the destination
File.Delete(Destination);
File.Move(ChunkSegmentFolder, Destination);
if (VerifyLocalExist())
{
return new SingleEntryTransferStatus(Source, Destination, "", EntryType.File, SingleChunkStatus.Successful);
}
return new SingleEntryTransferStatus(Source, Destination, "Size did not match", EntryType.File, SingleChunkStatus.Failed);
}
catch (Exception e)
{
return new SingleEntryTransferStatus(Source, Destination, e.Message, EntryType.File, SingleChunkStatus.Failed);
}
}
/// <summary>
/// Verify whether the input file's length and file length after upload or download are same
/// </summary>
/// <returns>True if it matches else false</returns>
private bool VerifyLocalExist()
{
long length = new FileInfo(Destination).Length;
if (length == FileSize)
{
return true;
}
return false;
}
private bool VerifyAdlExists(string destination)
{
try
{
long length = Client.GetDirectoryEntry(destination).Length;
if (length == FileSize)
{
return true;
}
return false;
}
catch (AdlsException excep)
{
if (excep.HttpStatus == HttpStatusCode.NotFound)
{
return false;
}
throw excep;
}
}
//Upload: Concats all the chunks into a temporary guid name. Then renames it to the destination to overwrite it
private SingleEntryTransferStatus PerformUploadJob()
{
// If only do rename then go do that- This will only happen in resume cases
if (_performUploadRenameOnly || PerformConcatWithRetries(out AdlsException adlsExcep))
{
string destGuid = ChunkSegmentFolder + FileUploader.DestTempGuidForConcat;
try
{
// This call is with retries
Client.Rename(destGuid, Destination, true);
if (VerifyAdlExists(Destination))
{
return new SingleEntryTransferStatus(Source, Destination, "", EntryType.File,
SingleChunkStatus.Successful);
}
return new SingleEntryTransferStatus(Source, Destination, "Size did not match", EntryType.File,
SingleChunkStatus.Failed);
}
catch (AdlsException excep)
{
adlsExcep = excep;
}
}
return new SingleEntryTransferStatus(Source, Destination, adlsExcep.Message, EntryType.File, SingleChunkStatus.Failed);
}
// Perform concat with retries. Currently retries only once. If the concat fails, checks whether it can be retried based on HttpStatuscode,
// If true then check whether the destiantion already exists and the source is deleted. If there is no intermediate state then returns true.
private bool PerformConcatWithRetries(out AdlsException excep)
{
var retryPolicy = new ExponentialRetryPolicy();
string destGuid = ChunkSegmentFolder + FileUploader.DestTempGuidForConcat;
var chunkList = new List<string>((int)_totalChunks);
for (int i = 0; i < _totalChunks; i++)
{
chunkList.Add(ChunkSegmentFolder + "/" + i);
}
int retries = 0;
do
{
excep = PerformConcatSingle(chunkList, destGuid);
if (excep == null)
{
return true;
}
if (!retryPolicy.ShouldRetryBasedOnHttpOutput((int)excep.HttpStatus, excep.Ex))
{
return false;
}
if (VerifyAdlExists(destGuid))
{
if (Client.CheckExists(ChunkSegmentFolder))
{
// If both destination and source folder exist then end-no way to recover
return false;
}
return true;
}
} while (retries++ < UploadRetryTime);
return false;
}
private AdlsException PerformConcatSingle(List<string> fileList, string destGuid)
{
AdlsException exception=null;
try
{
Client.ConcatenateFilesParallelAsync(destGuid, fileList).GetAwaiter().GetResult();
}
catch (AdlsException ex)
{
exception = ex;
}
return exception;
}
protected override string JobType()
{
return "FileTransfer.ConcatJob";
}
protected override string JobDetails()
{
return $"Source: {ChunkSegmentFolder}, Destination: {Destination}, Length: {FileSize}";
}
}
}