Source/Tx.Bond/BinaryEtlReader.cs (108 lines of code) (raw):

namespace Tx.Bond { using System; using System.Linq; using System.Reactive; using System.Reactive.Linq; public sealed class BinaryEtlReader : IObservable<IEnvelope> { private readonly bool useSequentialReader; private DateTime? startTime; private DateTime? endTime; private readonly string[] files; public BinaryEtlReader( bool useSequentialReader, params string[] files) { this.useSequentialReader = useSequentialReader; this.endTime = null; this.startTime = null; this.files = files; this.ValidateConfiguration(); } public BinaryEtlReader( bool useSequentialReader, DateTime startTime, DateTime endTime, params string[] files) { this.useSequentialReader = useSequentialReader; this.endTime = endTime; this.startTime = startTime; this.files = files; this.ValidateConfiguration(); } private void ValidateConfiguration() { if (this.startTime.HasValue != this.endTime.HasValue) { throw new ArgumentException("Specify both start and end times or leave both of them null."); } if (this.startTime.HasValue && this.startTime.Value >= this.endTime.Value) { throw new ArgumentException("Start time should be less than end time."); } if (this.files == null) { throw new ArgumentNullException("files"); } if (this.files.Length == 0) { throw new ArgumentException("Files parameter should contain at least one element.", "files"); } foreach (var path in this.files) { if (!PathUtils.IsValidPath(path)) { throw new ArgumentException("Files parameter contains invalid element - " + path + ".", "files"); } } } /// <summary>Notifies the provider that an observer is to receive notifications.</summary> /// <returns>A reference to an interface that allows observers to stop receiving notifications before the provider has finished sending them.</returns> /// <param name="observer">The object that is to receive notifications.</param> /// <exception cref="System.ArgumentNullException">observer is null.</exception> public IDisposable Subscribe(IObserver<IEnvelope> observer) { if (observer == null) { throw new ArgumentNullException(nameof(observer)); } this.ValidateConfiguration(); var flattenFiles = this.files .SelectMany(PathUtils.FlattenIfNeeded) .ToArray(); if (flattenFiles.Length == 0) { return Observable.Empty<IEnvelope>() .SubscribeSafe(observer); } IObservable<IEnvelope> observable; if (this.useSequentialReader) { if (this.startTime.HasValue) { observable = BinaryEtwObservable.FromSequentialFiles( this.startTime.Value, this.endTime.Value, flattenFiles); } else { observable = BinaryEtwObservable.FromSequentialFiles(flattenFiles); } } else { if (this.startTime.HasValue) { observable = BinaryEtwObservable.FromFiles( this.startTime.Value, this.endTime.Value, flattenFiles); } else { observable = BinaryEtwObservable.FromFiles(flattenFiles); } } return observable.SubscribeSafe(observer); } } }