in Source/Tx.Bond/BinaryEtlReader.cs [79:130]
public IDisposable Subscribe(IObserver<IEnvelope> observer)
{
if (observer == null)
{
throw new ArgumentNullException(nameof(observer));
}
this.ValidateConfiguration();
var flattenFiles = this.files
.SelectMany(PathUtils.FlattenIfNeeded)
.ToArray();
if (flattenFiles.Length == 0)
{
return Observable.Empty<IEnvelope>()
.SubscribeSafe(observer);
}
IObservable<IEnvelope> observable;
if (this.useSequentialReader)
{
if (this.startTime.HasValue)
{
observable = BinaryEtwObservable.FromSequentialFiles(
this.startTime.Value,
this.endTime.Value,
flattenFiles);
}
else
{
observable = BinaryEtwObservable.FromSequentialFiles(flattenFiles);
}
}
else
{
if (this.startTime.HasValue)
{
observable = BinaryEtwObservable.FromFiles(
this.startTime.Value,
this.endTime.Value,
flattenFiles);
}
else
{
observable = BinaryEtwObservable.FromFiles(flattenFiles);
}
}
return observable.SubscribeSafe(observer);
}