code/KustoCopyConsole/Storage/LogStorage.cs (357 lines of code) (raw):

using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Runtime.CompilerServices; using System.Text; using System.Text.Json; using System.Text.Json.Serialization; using System.Threading.Tasks; namespace KustoCopyConsole.Storage { internal partial class LogStorage { #region Inner Types /// <summary>Used for all three types of blobs: index, log (shard) & view.</summary> private class VersionHeader { public Version AppVersion { get; set; } = new(); } /// <summary>Used by the index blob.</summary>> private class IndexInfo { public long ShardCount { get; set; } = 0; } /// <summary>Used by log blobs (shards).</summary> private class LogInfo { public bool IsNewProcess { get; set; } = true; } /// <summary>Used by view blobs.</summary> private class ViewInfo { public long LastShardIncluded { get; set; } = 0; } [JsonSourceGenerationOptions( WriteIndented = false, PropertyNameCaseInsensitive = true, PropertyNamingPolicy = JsonKnownNamingPolicy.CamelCase, DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull)] [JsonSerializable(typeof(VersionHeader))] [JsonSerializable(typeof(IndexInfo))] [JsonSerializable(typeof(LogInfo))] [JsonSerializable(typeof(ViewInfo))] private partial class HeaderJsonContext : JsonSerializerContext { } #endregion private const string INDEX_PATH = "logs/index.log"; private const string LATEST_PATH = "logs/latest.log"; private const string HISTORICAL_LOG_ROOT_PATH = "logs/historical/"; private const string TEMP_PATH = "logs/temp/"; private readonly IFileSystem _fileSystem; private readonly Version _appVersion; private bool _isNewProcess = true; private long _currentShardIndex; private IAppendStorage _logAppendStorage; private int _appendCount = 0; #region Constructors private LogStorage( IFileSystem fileSystem, Version appVersion, long currentShardIndex, IAppendStorage logAppendStorage) { _fileSystem = fileSystem; _appVersion = appVersion; _currentShardIndex = currentShardIndex; _logAppendStorage = logAppendStorage; } public async static Task<LogStorage> CreateAsync( IFileSystem fileSystem, Version appVersion, CancellationToken ct) { var logFileCount = await FetchShardCountAsync(fileSystem, ct); var currentShardIndex = logFileCount + 1; var logAppendStorage = await GetLogAppendStorageAsync( fileSystem, currentShardIndex, ct); return new LogStorage(fileSystem, appVersion, currentShardIndex, logAppendStorage); } private async static Task<long> FetchShardCountAsync( IFileSystem fileSystem, CancellationToken ct) { using (var stream = await fileSystem.OpenReadAsync(INDEX_PATH, ct)) { if (stream == null) { return 0; } else { using (var reader = new StreamReader(stream)) { var headerText = await reader.ReadLineAsync(); var indexText = await reader.ReadLineAsync(); var header = headerText != null ? JsonSerializer.Deserialize<VersionHeader>( headerText, HeaderJsonContext.Default.VersionHeader) : null; var indexInfo = indexText != null ? JsonSerializer.Deserialize<IndexInfo>( indexText, HeaderJsonContext.Default.IndexInfo) : null; if (header == null) { throw new InvalidDataException("Index blob doesn't contain header"); } if (indexInfo == null) { throw new InvalidDataException( "Index blob doesn't contain index information"); } return indexInfo.ShardCount; } } } } #endregion /// <summary>Maximum size of buffer that can be written.</summary> public int MaxBufferSize => _logAppendStorage.MaxBufferSize; /// <summary>Reads the latest view by chunks.</summary> /// <param name="ct"></param> /// <returns></returns> /// <exception cref="NotImplementedException"></exception> public async IAsyncEnumerable<BlobChunk> ReadLatestViewAsync( [EnumeratorCancellation] CancellationToken ct) { long shardIndexIncluded = 0; // Start with the view itself using (var latestStream = await _fileSystem.OpenReadAsync(LATEST_PATH, ct)) { if (latestStream != null) { var versionHeaderText = await ReadLineGreedilyAsync(latestStream, ct); var viewText = await ReadLineGreedilyAsync(latestStream, ct); var versionHeader = versionHeaderText != null ? JsonSerializer.Deserialize<VersionHeader>( versionHeaderText, HeaderJsonContext.Default.VersionHeader) : null; var viewInfo = viewText != null ? JsonSerializer.Deserialize<ViewInfo>( viewText, HeaderJsonContext.Default.ViewInfo) : null; if (versionHeader == null) { throw new InvalidDataException( "Latest view blob doesn't contain version header"); } if (viewInfo == null) { throw new InvalidDataException( "Latest view blob doesn't contain view information"); } shardIndexIncluded = viewInfo.LastShardIncluded; yield return new BlobChunk(true, versionHeader.AppVersion, latestStream); } } // Loop through log files not included in the view for (long i = shardIndexIncluded + 1; i < _currentShardIndex; ++i) { using (var logStream = await _fileSystem.OpenReadAsync(GetLogPath(i), ct)) { if (logStream != null) { var versionHeaderText = await ReadLineGreedilyAsync(logStream, ct); var logInfoText = await ReadLineGreedilyAsync(logStream, ct); var versionHeader = versionHeaderText != null ? JsonSerializer.Deserialize<VersionHeader>( versionHeaderText, HeaderJsonContext.Default.VersionHeader) : null; var logInfo = logInfoText != null ? JsonSerializer.Deserialize<LogInfo>( logInfoText, HeaderJsonContext.Default.LogInfo) : null; if (versionHeader == null) { throw new InvalidDataException( "Log blob doesn't contain version header"); } if (logInfo == null) { throw new InvalidDataException( "Log blob doesn't contain view information"); } yield return new BlobChunk( logInfo.IsNewProcess, versionHeader.AppVersion, logStream); } } } } /// <summary>Writes the latest view.</summary> /// <param name="content"></param> /// <param name="ct"></param> /// <returns></returns> /// <exception cref="NotImplementedException"></exception> public async Task WriteLatestViewAsync(IEnumerable<byte> content, CancellationToken ct) { var completedShards = await SealShardAsync(ct); if (completedShards >= 1) { await AtomicReplaceAsync( LATEST_PATH, async tempStorage => { using (var memoryStream = new MemoryStream()) { // Headers for the view var header = new VersionHeader { AppVersion = _appVersion }; var viewInfo = new ViewInfo { LastShardIncluded = completedShards }; JsonSerializer.Serialize( memoryStream, header, HeaderJsonContext.Default.VersionHeader); memoryStream.WriteByte((byte)'\n'); JsonSerializer.Serialize( memoryStream, viewInfo, HeaderJsonContext.Default.ViewInfo); memoryStream.WriteByte((byte)'\n'); await tempStorage.AtomicAppendAsync(memoryStream.ToArray(), ct); } var appendBlock = new List<byte>(MaxBufferSize); // Content of the view foreach (var character in content) { appendBlock.Add(character); if (appendBlock.Count == MaxBufferSize) { await tempStorage.AtomicAppendAsync(appendBlock, ct); appendBlock.Clear(); } } // Push the remainder of content if (appendBlock.Any()) { await tempStorage.AtomicAppendAsync(appendBlock, ct); } }, ct); } } /// <summary>Append some content atomically.</summary> /// <param name="content"></param> /// <param name="ct"></param> /// <returns></returns> public async Task AtomicAppendAsync(IEnumerable<byte> content, CancellationToken ct) { if (_appendCount == 0) { // Shard hasn't been initialized await InitNewShardAsync(ct); } if (await _logAppendStorage.AtomicAppendAsync(content, ct)) { // Append worked, ack ++_appendCount; } else { // Roll over and retry await SealShardAsync(ct); await AtomicAppendAsync(content, ct); } } #region Shard management private async Task<long> SealShardAsync(CancellationToken ct) { if (_appendCount != 0) { ++_currentShardIndex; _appendCount = 0; _logAppendStorage = await GetLogAppendStorageAsync( _fileSystem, _currentShardIndex, ct); } return _currentShardIndex - 1; } private async Task InitNewShardAsync(CancellationToken ct) { await UpdateIndexAsync(ct); await AppendHeadersAsync(ct); } private async Task UpdateIndexAsync(CancellationToken ct) { await AtomicReplaceAsync( INDEX_PATH, async tempStorage => { using (var memoryStream = new MemoryStream()) { var header = new VersionHeader { AppVersion = _appVersion }; var indexInfo = new IndexInfo { ShardCount = _currentShardIndex }; JsonSerializer.Serialize( memoryStream, header, HeaderJsonContext.Default.VersionHeader); memoryStream.WriteByte((byte)'\n'); JsonSerializer.Serialize( memoryStream, indexInfo, HeaderJsonContext.Default.IndexInfo); memoryStream.WriteByte((byte)'\n'); await tempStorage.AtomicAppendAsync(memoryStream.ToArray(), ct); } }, ct); } private async Task AppendHeadersAsync(CancellationToken ct) { using (var memoryStream = new MemoryStream()) { var header = new VersionHeader { AppVersion = _appVersion }; var logInfo = new LogInfo { IsNewProcess = _isNewProcess }; JsonSerializer.Serialize( memoryStream, header, HeaderJsonContext.Default.VersionHeader); memoryStream.WriteByte((byte)'\n'); JsonSerializer.Serialize( memoryStream, logInfo, HeaderJsonContext.Default.LogInfo); memoryStream.WriteByte((byte)'\n'); await _logAppendStorage.AtomicAppendAsync(memoryStream.ToArray(), ct); _isNewProcess = false; ++_appendCount; } } #endregion private async Task AtomicReplaceAsync( string path, Func<IAppendStorage, Task> appendTempStorageFunc, CancellationToken ct) { var tempFileName = $"{TEMP_PATH}{Guid.NewGuid()}.log"; var tempStorage = await _fileSystem.OpenWriteAsync(tempFileName, ct); await appendTempStorageFunc(tempStorage); await _fileSystem.MoveAsync(tempFileName, path, ct); await _fileSystem.RemoveFolderAsync(TEMP_PATH, ct); } private async static Task<IAppendStorage> GetLogAppendStorageAsync( IFileSystem fileSystem, long logFileIndex, CancellationToken ct) { return await fileSystem.OpenWriteAsync(GetLogPath(logFileIndex), ct); } private static string GetLogPath(long logFileIndex) { return $"{HISTORICAL_LOG_ROOT_PATH}{logFileIndex:D20}.log"; } private async Task<string> ReadLineGreedilyAsync( Stream stream, CancellationToken ct) { var buffer = new byte[1]; var accumulatedBytes = new List<byte>(); while (await stream.ReadAsync(buffer, 0, 1, ct) == 1 && buffer[0] != (byte)'\n') { accumulatedBytes.Add(buffer[0]); } var text = ASCIIEncoding.UTF8.GetString(accumulatedBytes.ToArray()); return text; } } }