AdlsDotNetSDK/FileStatusList.cs (175 lines of code) (raw):
using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
using Microsoft.Azure.DataLake.Store.RetryPolicies;
using Microsoft.Azure.DataLake.Store.Serialization;
namespace Microsoft.Azure.DataLake.Store
{
/// <summary>
/// Enumerable that exposes enumerator:FileStatusList
/// </summary>
internal class FileStatusOutput<T> : IEnumerable<T> where T:DirectoryEntry
{
/// <summary>
/// Number of maximum directory entries to be retrieved from server. If -1 then retrieve all entries
/// </summary>
private readonly int _maxEntries;
/// <summary>
/// Filename after which list of files should be obtained from server
/// </summary>
private readonly string _listAfter;
/// <summary>
/// Filename till which list of files should be obtained from server
/// </summary>
private readonly string _listBefore;
/// <summary>
/// ADLS Client
/// </summary>
private readonly AdlsClient _client;
/// <summary>
/// Way the user or group object will be represented
/// </summary>
private readonly UserGroupRepresentation? _ugr;
/// <summary>
/// Path of the directory containing the sub-directories or files
/// </summary>
private readonly string _path;
/// <summary>
/// selection
/// </summary>
private readonly Selection _selection;
private readonly IDictionary<string, string> _extraQueryParamsForListStatus;
private readonly CancellationToken _cancelToken;
/// <summary>
/// Returns the enumerator
/// </summary>
/// <returns></returns>
public IEnumerator<T> GetEnumerator()
{
return new FileStatusList<T>(_listBefore, _listAfter, _maxEntries, _ugr, _client, _selection, _path, _cancelToken, _extraQueryParamsForListStatus);
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
internal FileStatusOutput(string listBefore, string listAfter, int maxEntries, UserGroupRepresentation? ugr, AdlsClient client, string path, CancellationToken cancelToken, IDictionary<string, string> extraQueryParamsForListStatus = null)
{
_listBefore = listBefore;
_maxEntries = maxEntries;
_listAfter = listAfter;
_ugr = ugr;
_client = client;
_path = path;
_cancelToken = cancelToken;
_extraQueryParamsForListStatus = extraQueryParamsForListStatus;
}
internal FileStatusOutput(string listBefore, string listAfter, int maxEntries, UserGroupRepresentation? ugr, AdlsClient client, string path, Selection selection, CancellationToken cancelToken, IDictionary<string, string> extraQueryParamsForListStatus = null)
{
_listBefore = listBefore;
_maxEntries = maxEntries;
_listAfter = listAfter;
_ugr = ugr;
_client = client;
_path = path;
_selection = selection;
_cancelToken = cancelToken;
_extraQueryParamsForListStatus = extraQueryParamsForListStatus;
}
}
/// <summary>
/// Encapsulates a collection storing the list of directory entries. Once the collection is traversed, retrieves next set of directory entries from server
/// This is for internal use only. Made public because we want to cast the enumerator to test enumeration with a smaller page size.
/// </summary>
internal class FileStatusList<T> : IEnumerator<T> where T:DirectoryEntry
{
/// <summary>
/// Internal collection storing list of directory entries retrieved from server. This is not the whole list of directory entries.
/// It's size is less than equal to listSize
/// </summary>
private List<T> FileStatus { get; set; }
/// <summary>
/// Number of maximum directory entries to retrieve from server at one time
/// </summary>
private int _listSize = 4000;
/// <summary>
/// Internal property to set the list size
/// </summary>
internal int ListSize
{
set { _listSize = value > 4000 ? 4000 : value; }
private get { return _listSize; }
}
/// <summary>
/// Maximum number of entries to be enumerated as entered by user. If it is -1 then enumerate all the directory entries
/// </summary>
private readonly int _maxEntries;
/// <summary>
/// Number of entries left to enumerate
/// </summary>
private int RemainingEntries { get; set; }
/// <summary>
/// Flag indicating enumerate all the directory entries
/// </summary>
private bool EnumerateAll { get; }
/// <summary>
/// Filename after which we should start the enumeration - entered by user
/// </summary>
private readonly string _listAfterClient;
/// <summary>
/// Filename after which list of files should be obtained from server next time, updated before everytime the
/// </summary>
private string ListAfterNext { get; set; }
/// <summary>
/// Filename till which list of files should be obtained from server
/// </summary>
private string ListBefore { get; }
/// <summary>
/// ADLS Client
/// </summary>
private AdlsClient Client { get; }
/// <summary>
/// selection
/// </summary>
private Selection Selection { get; }
/// <summary>
/// Way the user or group object will be represented
/// </summary>
private UserGroupRepresentation? Ugr { get; }
/// <summary>
/// Path of the directory conatianing the sub-directories or files
/// </summary>
private string Path { get; }
private readonly CancellationToken _cancelToken;
private readonly IDictionary<string, string> _extraQueryParamsForListStatus;
/// <summary>
/// Represents the current directory entry in the internal collection: FileStatus
/// </summary>
public T Current
{
get
{
try
{
return FileStatus[_position];
}
catch (IndexOutOfRangeException)
{
throw new InvalidOperationException("The index is out of range");
}
}
}
/// <summary>
/// Index representating the current position in the internal collection: FileStatus
/// </summary>
private int _position = -1;
/// <summary>
/// Immplemented interface property
/// </summary>
object IEnumerator.Current => Current;
/// <summary>
/// Represent the continationToken for ListStatus
/// </summary>
private string continuationToken;
/// <summary>
/// Advances the enumerator to the next element in the internal collection.
/// If the end of the internal collection is reached, performs a ListStatus call to the server to see if any more directories/files need to be enumerated. If yes
/// then the internal collection is populated with the next set of directory entries. The internal index pointing to the current element is updated. If not, then returns false.
/// </summary>
/// <returns>True if there is a next element to enumerate else false</returns>
public bool MoveNext()
{
if (_cancelToken.IsCancellationRequested)
{
throw new OperationCanceledException();
}
//Not called for first time, first time when this is called ListAfterNext will be whatever client has passed
if (FileStatus != null)
{
_position++;
//FileStatus.Count will be minimum of ListSize and remaining entries asked by user
if (_position < FileStatus.Count)//Still more data to be enumerated
{
if (!EnumerateAll)
{
RemainingEntries--;
}
return true;
}
//Number of entries wanted by the user is already enumerated
//RemainingEntries 0 means no need to look at server since last time we retrieved "RemainingEntries" number of entries from server
if (!EnumerateAll && RemainingEntries <= 0)
{
return false;
}
// Older behavior for Selection == Minimal. Remove this if else when API is updated.
if (Selection == Selection.Minimal)
{
if (_position < ListSize)
{
//position has reached end of the internal list. But number of directory entries retrieved from last server call is less than list
//size so no more entries are left on server. So even though RemainingEntries is positive, but there is no data in server. Return false
return false;
}
//Else we have to look in server to see if we still have any more directory entries to enumerate
//Obtain the last enumerated entry name so that we can retrieve files after that from server
ListAfterNext = FileStatus[_position - 1].Name;
}
else
{
if (string.IsNullOrEmpty(continuationToken))
{
//Continuation token is blank or null. No more entries.
return false;
}
else
{
//Obtain the last enumerated entry name so that we can retrieve files after that from server
ListAfterNext = continuationToken;
}
}
}
_position = -1;
OperationResponse resp = new OperationResponse();
int getListSize = EnumerateAll ? ListSize : Math.Min(ListSize, RemainingEntries);
// EnumerateDirectoryChangeAclJob also calls core separately. If you change logic here, consider changing there also
var fileListResult = Core.ListStatusAsync<DirectoryEntryListResult<T>>(Path, ListAfterNext, ListBefore, getListSize, Ugr, Selection, _extraQueryParamsForListStatus, Client, new RequestOptions(Client.GetPerRequestTimeout(), new ExponentialRetryPolicy()), resp, _cancelToken).GetAwaiter().GetResult();
if (!resp.IsSuccessful)
{
throw Client.GetExceptionFromResponse(resp, "Error getting listStatus for path " + Path + " after " + ListAfterNext);
}
FileStatus = Core.GetDirectoryEntryListWithFullPath(Path, fileListResult, resp);
if (!resp.IsSuccessful)
{
throw Client.GetExceptionFromResponse(resp, "Unexpected error getting listStatus for path " + Path + " after " + ListAfterNext);
}
// Retrieve the continuation token here since above we have checked whether fileListResult. FileStatuses is not null
continuationToken = fileListResult.FileStatuses.ContinuationToken;
return MoveNext();
}
internal FileStatusList(string listBefore, string listAfter, int maxEntries, UserGroupRepresentation? ugr, AdlsClient client, Selection selection, string path, CancellationToken cancelToken, IDictionary<string, string> extraQueryParamsForListStatus = null)
{
ListBefore = listBefore;
ListAfterNext = _listAfterClient = listAfter;
RemainingEntries = _maxEntries = maxEntries;
if (_maxEntries == -1)
{
EnumerateAll = true;
}
Ugr = ugr;
Client = client;
Selection = selection;
Path = path;
_cancelToken = cancelToken;
_extraQueryParamsForListStatus = extraQueryParamsForListStatus;
}
/// <summary>
/// Clears the internal collection and resets the index, ListAfterNext and remaininig entries of collection
/// </summary>
public void Reset()
{
FileStatus = null;
_position = -1;
ListAfterNext = _listAfterClient;
RemainingEntries = _maxEntries;
}
/// <summary>
/// Disposes the enumerable
/// </summary>
public void Dispose()
{
FileStatus = null;
}
}
}