in code/KustoCopyConsole/Storage/LogStorage.cs [146:223]
public async IAsyncEnumerable<BlobChunk> ReadLatestViewAsync(
[EnumeratorCancellation]
CancellationToken ct)
{
long shardIndexIncluded = 0;
// Start with the view itself
using (var latestStream = await _fileSystem.OpenReadAsync(LATEST_PATH, ct))
{
if (latestStream != null)
{
var versionHeaderText = await ReadLineGreedilyAsync(latestStream, ct);
var viewText = await ReadLineGreedilyAsync(latestStream, ct);
var versionHeader = versionHeaderText != null
? JsonSerializer.Deserialize<VersionHeader>(
versionHeaderText,
HeaderJsonContext.Default.VersionHeader)
: null;
var viewInfo = viewText != null
? JsonSerializer.Deserialize<ViewInfo>(
viewText,
HeaderJsonContext.Default.ViewInfo)
: null;
if (versionHeader == null)
{
throw new InvalidDataException(
"Latest view blob doesn't contain version header");
}
if (viewInfo == null)
{
throw new InvalidDataException(
"Latest view blob doesn't contain view information");
}
shardIndexIncluded = viewInfo.LastShardIncluded;
yield return new BlobChunk(true, versionHeader.AppVersion, latestStream);
}
}
// Loop through log files not included in the view
for (long i = shardIndexIncluded + 1; i < _currentShardIndex; ++i)
{
using (var logStream = await _fileSystem.OpenReadAsync(GetLogPath(i), ct))
{
if (logStream != null)
{
var versionHeaderText = await ReadLineGreedilyAsync(logStream, ct);
var logInfoText = await ReadLineGreedilyAsync(logStream, ct);
var versionHeader = versionHeaderText != null
? JsonSerializer.Deserialize<VersionHeader>(
versionHeaderText,
HeaderJsonContext.Default.VersionHeader)
: null;
var logInfo = logInfoText != null
? JsonSerializer.Deserialize<LogInfo>(
logInfoText,
HeaderJsonContext.Default.LogInfo)
: null;
if (versionHeader == null)
{
throw new InvalidDataException(
"Log blob doesn't contain version header");
}
if (logInfo == null)
{
throw new InvalidDataException(
"Log blob doesn't contain view information");
}
yield return new BlobChunk(
logInfo.IsNewProcess,
versionHeader.AppVersion,
logStream);
}
}
}
}