in AdlsDotNetSDK/FileTransfer/FileTransferCommon.cs [403:461]
private void ConsumerRun()
{
while (true)
{
if (CancelToken.IsCancellationRequested)
{
return;
}
var job = ConsumerQueue.Poll();
if (job is PoisonJob)
{
ConsumerQueue.Add(new PoisonJob());
return;
}
var res = job.DoRun(JobLog) as SingleEntryTransferStatus;
if (res == null)
{
continue;
}
if (res.Status == SingleChunkStatus.Successful)
{
if (res.Type == EntryType.Chunk)
{
Interlocked.Increment(ref Status.ChunksTransfered);
RecordedMetadata.AddRecord($"CHUNK{TransferLog.MetaDataDelimiter}{res.Source}{TransferLog.MetaDataDelimiter}{res.ChunkId}");
}
else if (res.Type == EntryType.File)
{
// Entry size is zero for concat
if (res.EntrySize != 0)
{
Interlocked.Increment(ref Status.NonChunkedFileTransferred);
}
Interlocked.Increment(ref Status.FilesTransfered);
// For successful concat we want to flush the metadata records
AddCompleteRecord(res.Source, res.EntrySize == 0);
}
else
{
Interlocked.Increment(ref Status.DirectoriesTransferred);
AddCompleteRecord(res.Source);
}
if (res.EntrySize > 0)
{
Interlocked.Add(ref Status.SizeTransfered, res.EntrySize);
}
}
else if (res.Status == SingleChunkStatus.Failed)
{
Status.AddFailedEntries(res);
}
else
{
Status.AddSkippedEntries(res.EntryName);
}
}
}