src/StructuredLogger/BinaryLogger/BinLogReader.cs (246 lines of code) (raw):

using System; using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.IO.Compression; using System.Linq; using Microsoft.Build.Framework; namespace Microsoft.Build.Logging.StructuredLogger { /// <summary> /// Provides a method to read a binary log file (*.binlog) and replay all stored BuildEventArgs /// by implementing IEventSource and raising corresponding events. /// </summary> /// <remarks>The class is public so that we can call it from MSBuild.exe when replaying a log file.</remarks> public sealed class BinLogReader : EventArgsDispatcher { /// <summary> /// Raised when the log reader encounters a binary blob embedded in the stream. /// The arguments include the blob kind and the byte buffer with the contents. /// </summary> public event Action<BinaryLogRecordKind, byte[]> OnBlobRead; public event Action<string, long> OnStringRead; public event Action<IDictionary<string, string>, long> OnNameValueListRead; public event Action<int> OnFileFormatVersionRead; public event Action<IEnumerable<string>> OnStringDictionaryComplete; /// <summary> /// Raised when there was an exception reading a record from the file. /// </summary> public event Action<Exception> OnException; /// <summary> /// Read the provided binary log file and raise corresponding events for each BuildEventArgs /// </summary> /// <param name="sourceFilePath">The full file path of the binary log file</param> public void Replay(string sourceFilePath) => Replay(sourceFilePath, progress: null); /// <summary> /// Read the provided binary log file and raise corresponding events for each BuildEventArgs /// </summary> /// <param name="sourceFilePath">The full file path of the binary log file</param> /// <param name="progress">optional callback to receive progress updates</param> public void Replay(string sourceFilePath, Progress progress) { using (var stream = new FileStream(sourceFilePath, FileMode.Open, FileAccess.Read, FileShare.Read)) { Replay(stream, progress); } } public void Replay(Stream stream) { Replay(stream, progress: null); } public void Replay(Stream stream, Progress progress) { var gzipStream = new GZipStream(stream, CompressionMode.Decompress, leaveOpen: true); var bufferedStream = new BufferedStream(gzipStream, 32768); var binaryReader = new BinaryReader(bufferedStream); int fileFormatVersion = binaryReader.ReadInt32(); OnFileFormatVersionRead?.Invoke(fileFormatVersion); EnsureFileFormatVersionKnown(fileFormatVersion); // Use a producer-consumer queue so that IO can happen on one thread // while processing can happen on another thread decoupled. The speed // up is from 4.65 to 4.15 seconds. var queue = new BlockingCollection<BuildEventArgs>(boundedCapacity: 5000); var processingTask = System.Threading.Tasks.Task.Run(() => { foreach (var args in queue.GetConsumingEnumerable()) { Dispatch(args); } }); int recordsRead = 0; using var reader = new BuildEventArgsReader(binaryReader, fileFormatVersion); reader.OnBlobRead += OnBlobRead; Stopwatch stopwatch = Stopwatch.StartNew(); var streamLength = stream.Length; while (true) { BuildEventArgs instance = null; try { instance = reader.Read(); } catch (Exception ex) { OnException?.Invoke(ex); } recordsRead++; if (instance == null) { queue.CompleteAdding(); break; } queue.Add(instance); if (progress != null && recordsRead % 1000 == 0 && stopwatch.ElapsedMilliseconds > 200) { stopwatch.Restart(); var streamPosition = stream.Position; double ratio = (double)streamPosition / streamLength; progress.Report(ratio); } } processingTask.Wait(); if (fileFormatVersion >= 10) { var strings = reader.GetStrings(); if (strings != null && strings.Any()) { OnStringDictionaryComplete?.Invoke(strings); } } if (progress != null) { progress.Report(1.0); } } private void EnsureFileFormatVersionKnown(int fileFormatVersion) { // the log file is written using a newer version of file format // that we don't know how to read if (fileFormatVersion > BinaryLogger.FileFormatVersion) { var text = $"Unsupported log file format. Latest supported version is {BinaryLogger.FileFormatVersion}, the log file has version {fileFormatVersion}."; if (BinaryLogger.IsNewerVersionAvailable) { text += " Update available - restart this instance to automatically use newer version."; } throw new NotSupportedException(text); } } private class DisposableEnumerable<T> : IEnumerable<T>, IDisposable { private IEnumerable<T> enumerable; private Action dispose; public static IEnumerable<T> Create(IEnumerable<T> enumerable, Action dispose) { return new DisposableEnumerable<T>(enumerable, dispose); } public DisposableEnumerable(IEnumerable<T> enumerable, Action dispose) { this.enumerable = enumerable; this.dispose = dispose; } public void Dispose() { if (dispose != null) { dispose(); dispose = null; } } public IEnumerator<T> GetEnumerator() => enumerable.GetEnumerator(); IEnumerator IEnumerable.GetEnumerator() => enumerable.GetEnumerator(); } /// <summary> /// Enumerate over all records in the file. For each record store the bytes, /// the start position in the stream, length in bytes and the deserialized object. /// </summary> /// <remarks>Useful for debugging and analyzing binary logs</remarks> public IEnumerable<Record> ReadRecords(string logFilePath) { var stream = new FileStream(logFilePath, FileMode.Open, FileAccess.Read, FileShare.Read); return DisposableEnumerable<Record>.Create(ReadRecords(stream), () => stream.Dispose()); } public IEnumerable<Record> ReadRecords(byte[] bytes) { var stream = new MemoryStream(bytes); return ReadRecords(stream); } /// <summary> /// Enumerate over all records in the binary log stream. For each record store the bytes, /// the start position in the stream, length in bytes and the deserialized object. /// </summary> /// <remarks>Useful for debugging and analyzing binary logs</remarks> public IEnumerable<Record> ReadRecords(Stream binaryLogStream) { var gzipStream = new GZipStream(binaryLogStream, CompressionMode.Decompress, leaveOpen: true); var bufferedStream = new BufferedStream(gzipStream, 32768); return ReadRecordsFromDecompressedStream(bufferedStream); } public IEnumerable<Record> ReadRecordsFromDecompressedStream(Stream decompressedStream) { var wrapper = new WrapperStream(decompressedStream); var binaryReader = new BinaryReader(wrapper); int fileFormatVersion = binaryReader.ReadInt32(); EnsureFileFormatVersionKnown(fileFormatVersion); long lengthOfBlobsAddedLastTime = 0; List<Record> blobs = new List<Record>(); using var reader = new BuildEventArgsReader(binaryReader, fileFormatVersion); // forward the events from the reader to the subscribers of this class reader.OnBlobRead += OnBlobRead; long start = 0; reader.OnBlobRead += (kind, blob) => { start = wrapper.Position; var record = new Record { Bytes = blob, Args = null, Start = start - blob.Length, // TODO: check if this is accurate Length = blob.Length }; blobs.Add(record); lengthOfBlobsAddedLastTime += blob.Length; }; reader.OnStringRead += text => { long length = wrapper.Position - start; // re-read the current position as we're just about to start reading // the actual BuildEventArgs record start = wrapper.Position; OnStringRead?.Invoke(text, length); }; reader.OnNameValueListRead += list => { long length = wrapper.Position - start; start = wrapper.Position; OnNameValueListRead?.Invoke(list, length); }; while (true) { BuildEventArgs instance = null; start = wrapper.Position; instance = reader.Read(); if (instance == null) { break; } var record = new Record { Bytes = null, // probably can reconstruct this from the Args if necessary Args = instance, Start = start, Length = wrapper.Position - start }; yield return record; lengthOfBlobsAddedLastTime = 0; } foreach (var blob in blobs) { yield return blob; } } } public class WrapperStream : Stream { private readonly Stream stream; public WrapperStream(Stream stream) { this.stream = stream; } public override bool CanRead => stream.CanRead; public override bool CanSeek => stream.CanSeek; public override bool CanWrite => stream.CanWrite; public override long Length => stream.Length; private long position; public override long Position { get => position; set => throw new NotImplementedException(); } public override void Flush() { stream.Flush(); } public override int Read(byte[] buffer, int offset, int count) { var result = stream.Read(buffer, offset, count); position += result; return result; } public override long Seek(long offset, SeekOrigin origin) { return stream.Seek(offset, origin); } public override void SetLength(long value) { stream.SetLength(value); } public override void Write(byte[] buffer, int offset, int count) { stream.Write(buffer, offset, count); } } }