AdlsDotNetSDKUnitTest/TransferUnitTest.cs (206 lines of code) (raw):
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using TestDataCreator;
using Microsoft.Azure.DataLake.Store.FileTransfer;
using Microsoft.Azure.DataLake.Store.FileTransfer.Jobs;
namespace Microsoft.Azure.DataLake.Store.UnitTest
{
[TestClass]
public class TransferUnitTest
{
private static AdlsClient _adlsClient;
private static string BasePath;
private static readonly string LocalRelativePath = @".\" + SdkUnitTest.TestId;
private static readonly string LocalPath = Directory.GetCurrentDirectory() + @"\" + SdkUnitTest.TestId;
private static string RemotePath;
private static readonly string LocalPathUpload1 = $"{LocalPath}\\B";
private static readonly string LocalPathUpload2 = $"{LocalRelativePath}\\C";
private static string RemotePathUpload1;
private static string RemotePathUpload2;
private static string RemotePathDownload;
private static readonly string LocalPathDownload = $"{LocalPath}\\A";
private static int TransferChunkSize = 240 * 1024;
private static int LowFileSize = 100 * 1024;
private static int HighFileSize = 500 * 1024;
private static int DataCreatorBuffSize = 2 * 1024; // At this offset there will be new lines while writing, this should be less than ADlsOutputStream.Buffercapacity.
private static int CopyBufferSize = 4 * 1024; // Buffer size for FileCopy and ADlsOutputStream, not necessary at all just kept to catch corner cases
private static int ReadBufferForwardSize = 8; // For nonbinary we read forward, this is the size we should read forward
[ClassInitialize]
public static void SetupClassTests(TestContext context)
{
BasePath = context.Properties["BasePath"].ToString();
RemotePath ="/" + BasePath+ "/Test1" + SdkUnitTest.TestId;
RemotePathUpload1 = $"{RemotePath}/Uploader/B";
RemotePathUpload2 = $"{RemotePath}/Uploader/C";
RemotePathDownload = $"{RemotePath}/Downloader/A";
_adlsClient = SdkUnitTest.SetupSuperClient();
_adlsClient.DeleteRecursive(RemotePath);
if (!Directory.Exists(LocalPath))
{
Directory.CreateDirectory(LocalPath);
}
DataCreator.CreateDirRecursiveRemote(_adlsClient, RemotePathDownload, 2, 3, 3, 4, LowFileSize, HighFileSize, true);
DataCreator.BuffSize = DataCreatorBuffSize;
DataCreator.CreateDirRecursiveLocal(LocalPathUpload1, 1, 3, 3, 4, LowFileSize, HighFileSize, "", true);
DataCreator.CreateDirRecursiveLocal(LocalPathUpload2, 1, 3, 3, 4, LowFileSize, HighFileSize, "", true);
}
[TestInitialize]
public void SetupTest()
{
CopyFileJob.ReadForwardBuffSize = ReadBufferForwardSize;
AdlsOutputStream.BufferMaxCapacity = CopyBufferSize;
AdlsOutputStream.BufferMinCapacity = 0;
// Below are the settings that forces download to be chunked for sizes greater than chunk size
FileDownloader.SkipChunkingWeightThreshold = TransferChunkSize;
FileDownloader.NumLargeFileThreshold = Int64.MaxValue;
}
[TestMethod]
public void TestUploadNonBinary()
{
TransferStatus status = FileUploader.Upload(LocalPathUpload1, RemotePathUpload1, _adlsClient, 10, IfExists.Overwrite, null, false, false, false, false, default(CancellationToken), false, TransferChunkSize);
Assert.IsTrue(status.EntriesFailed.Count == 0);
Assert.IsTrue(status.EntriesSkipped.Count == 0);
long origSuccess = status.FilesTransfered;
Queue<DirectoryInfo> localQueue = new Queue<DirectoryInfo>();
Queue<DirectoryEntry> remoteQueue = new Queue<DirectoryEntry>();
localQueue.Enqueue(new DirectoryInfo(LocalPathUpload1));
remoteQueue.Enqueue(_adlsClient.GetDirectoryEntry(RemotePathUpload1));
Verify(localQueue, remoteQueue);
status = FileUploader.Upload(LocalPathUpload1, RemotePathUpload1, _adlsClient, 10, IfExists.Fail, null, false, false, false, false, default(CancellationToken), false, TransferChunkSize);
Assert.IsTrue(origSuccess == status.EntriesSkipped.Count);
}
[TestMethod]
public void TestUploadBinary()
{
TransferStatus status = FileUploader.Upload(LocalPathUpload2, RemotePathUpload2, _adlsClient, 10, IfExists.Overwrite, null, false, true, false, true, default(CancellationToken), false, TransferChunkSize);
Assert.IsTrue(status.EntriesFailed.Count == 0);
Assert.IsTrue(status.EntriesSkipped.Count == 0);
long origSuccess = status.FilesTransfered;
Queue<DirectoryInfo> localQueue = new Queue<DirectoryInfo>();
Queue<DirectoryEntry> remoteQueue = new Queue<DirectoryEntry>();
localQueue.Enqueue(new DirectoryInfo(LocalPathUpload2));
remoteQueue.Enqueue(_adlsClient.GetDirectoryEntry(RemotePathUpload2));
Verify(localQueue, remoteQueue);
status = FileUploader.Upload(LocalPathUpload2, RemotePathUpload2, _adlsClient, 10, IfExists.Fail, null, false,false, false, true, default(CancellationToken), false, TransferChunkSize);
Assert.IsTrue(origSuccess == status.EntriesSkipped.Count);
}
[TestMethod]
public void TestUploadResumeAndDisableTransferLogging()
{
try
{
_adlsClient.BulkUpload(LocalPathUpload2, RemotePathUpload2, 10, IfExists.Overwrite, true, null, false, true, true, default(CancellationToken));
}
catch(ArgumentException ex)
{
Assert.IsTrue(ex.Message.Contains("resume and disablelogging both cannot be true"));
}
}
[TestMethod]
public void TestDownload()
{
TransferStatus status = FileDownloader.Download(RemotePathDownload, LocalPathDownload, _adlsClient, 25, IfExists.Overwrite, null, false, false, false, default(CancellationToken), false, 4194304, TransferChunkSize);//,null,IfExists.Overwrite,false,4194304,251658240L,true);
Assert.IsTrue(status.EntriesFailed.Count == 0);
Assert.IsTrue(status.EntriesSkipped.Count == 0);
long origSuccess = status.FilesTransfered;
Queue<DirectoryInfo> localQueue = new Queue<DirectoryInfo>();
Queue<DirectoryEntry> remoteQueue = new Queue<DirectoryEntry>();
localQueue.Enqueue(new DirectoryInfo(LocalPathDownload));
remoteQueue.Enqueue(_adlsClient.GetDirectoryEntry(RemotePathDownload));
Verify(localQueue, remoteQueue);
status = FileDownloader.Download(RemotePathDownload, LocalPathDownload, _adlsClient, 10, IfExists.Fail);
Assert.IsTrue(origSuccess == status.EntriesSkipped.Count);
}
private int Read(Stream stream, byte[] arr)
{
int bytesToRead = arr.Length;
int totBytesRead = 0;
while (bytesToRead > 0)
{
int bytesRead = stream.Read(arr, totBytesRead, bytesToRead);
if (bytesRead == 0)
{
break;
}
bytesToRead -= bytesRead;
totBytesRead += bytesRead;
}
return totBytesRead;
}
private bool ByteArrayComparer(byte[] localBuff, byte[] remoteBuff)
{
for (int i = 0; i < localBuff.Length; i++)
{
if (localBuff[i] != remoteBuff[i])
{
return false;
}
}
return true;
}
private void Verify(Queue<DirectoryInfo> localQueue, Queue<DirectoryEntry> remoteQueue)
{
while (localQueue.Count > 0 && remoteQueue.Count > 0)
{
SortedDictionary<string, FileSystemInfo> localDict = new SortedDictionary<string, FileSystemInfo>();
DirectoryInfo localDir = localQueue.Dequeue();
IEnumerable<DirectoryInfo> enumDir = localDir.EnumerateDirectories();
foreach (var subDir in enumDir)
{
localDict.Add(subDir.Name, subDir);
}
IEnumerable<FileInfo> enumFiles = localDir.EnumerateFiles();
foreach (var file in enumFiles)
{
if (file.Name.EndsWith("-transfer.dat"))
{
continue;
}
localDict.Add(file.Name, file);
}
int remoteEntries = 0;
DirectoryEntry remoteDir = remoteQueue.Dequeue();
var fop = _adlsClient.EnumerateDirectory(remoteDir.FullName);
foreach (var dir in fop)
{
if (!localDict.ContainsKey(dir.Name))
{
Assert.Fail(dir.Name + " should exist in local.");
}
FileSystemInfo localEntry = localDict[dir.Name];
remoteEntries++;
if (dir.Type == DirectoryEntryType.DIRECTORY)
{
if (!(localEntry is DirectoryInfo))
{
Assert.Fail(localEntry + " should also be a directory");
}
localQueue.Enqueue((DirectoryInfo)localEntry);
remoteQueue.Enqueue(dir);
}
else
{
if (!(localEntry is FileInfo))
{
Assert.Fail(localEntry + " should also be a file");
}
var localFileEntry = (FileInfo)localEntry;
using (Stream localStream = localFileEntry.OpenRead(),
remoteStream = _adlsClient.GetReadStream(dir.FullName))
{
Assert.IsTrue(localStream.Length == remoteStream.Length);
long lengthToread = localStream.Length;
byte[] localBuff = new byte[16 * 1024 * 1024];
byte[] remoteBuff = new byte[16 * 1024 * 1024];
while (lengthToread > 0)
{
int localBytesRead = Read(localStream, localBuff);
int remoteBytesRead = Read(remoteStream, remoteBuff);
Assert.IsTrue(remoteBytesRead == localBytesRead);
Assert.IsTrue(ByteArrayComparer(localBuff, remoteBuff));
if (localBytesRead == 0 && lengthToread > 0)
{
Assert.Fail("Unexpected problem");
}
lengthToread -= localBytesRead;
}
}
}
}
Assert.IsTrue(localDict.Count == remoteEntries);
}
}
[TestCleanup]
public void CleanupTest()
{
CopyFileJob.ReadForwardBuffSize = 8 * 1024;
AdlsOutputStream.BufferMaxCapacity = 4 * 1024 * 1024;
AdlsOutputStream.BufferMinCapacity = 1 * 1024 * 1024;
FileDownloader.SkipChunkingWeightThreshold = 1 * 1024 * 1024 * 1024L;
FileDownloader.NumLargeFileThreshold = 20;
}
[ClassCleanup]
public static void CleanUpClassTests()
{
_adlsClient.DeleteRecursive(RemotePath);
DataCreator.DeleteRecursiveLocal(new DirectoryInfo(LocalPath));
}
}
}