Source/Tx.Core/Playback.cs (237 lines of code) (raw):
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Linq.Expressions;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
namespace System.Reactive
{
/// <summary>
/// Playback serves two purposes:
/// (1) Replay the history from one or more trace/log files, by turning the event occurence records into C# instances
/// (2) Deliver events from fixed set of real time sessions as C# instances
/// The invariants that Playback preserves are:
/// (a) The order within one input file/session (inputs events must be in order of occurence, and have increasing timestamps)
/// (b) The illusion of global order - merging different files/streams on timestamp
/// </summary>
public abstract class PlaybackBase : ITimeSource, IPlaybackConfiguration, IDisposable
{
protected readonly Demultiplexor _demux;
protected readonly List<IInputStream> _inputs;
private readonly List<IDisposable> _outputBuffers;
private readonly Stopwatch _stopwatch;
private readonly Subject<Timestamped<object>> _subject = new Subject<Timestamped<object>>();
private readonly TimeSource<Timestamped<object>> _timeSource;
private readonly IDisposable _toDemux;
private PullMergeSort<Timestamped<object>> _mergesort;
private OutputPump<Timestamped<object>> _pump;
private ManualResetEvent _pumpStart;
private DateTime _startTime = DateTime.MinValue;
/// <summary>
/// Constructor
/// </summary>
protected PlaybackBase()
{
EndTime = DateTime.MaxValue;
_inputs = new List<IInputStream>();
_demux = new Demultiplexor();
_timeSource = new TimeSource<Timestamped<object>>(_subject, e => e.Timestamp);
_toDemux = _timeSource
.Select(e => e.Value)
.Subscribe(_demux);
_outputBuffers = new List<IDisposable>();
_stopwatch = new Stopwatch();
}
/// <summary>
/// Gets or sets the start time for the playback
/// The setter must be called before any operators that take Scheduler are used
///
/// Playback will only deliver event after the given start time
/// </summary>
public DateTime StartTime
{
get { return _startTime; }
set
{
_startTime = value;
_timeSource.StartTime = new DateTimeOffset(value);
}
}
// set this to ignore events past given time
public DateTime EndTime { get; set; }
/// <summary>
/// The event types that are known
/// If you do playback.GetObservable<A>(); playback.GetObservable<B>();
/// the known types will be A and B
/// Only known event types can be formatted to text
/// Be sure to set the known types before calling Start() or Run()
/// </summary>
public Type[] KnownTypes { get; set; }
/// <summary>
/// The time elapsed
/// - from calling Start() or Run(),
/// - to the current time (if processing is in progress) or the end of processing (e.g. Run() returns)
/// </summary>
public TimeSpan ExecutionDuration
{
get { return _stopwatch.Elapsed; }
}
public void Dispose()
{
foreach (IInputStream input in _inputs)
{
((IDisposable) input).Dispose();
}
if (null != _pump)
_pump.Dispose();
if (null != _pumpStart)
_pumpStart.Dispose();
if (null != _subject)
_subject.Dispose();
if (null != _toDemux)
_toDemux.Dispose();
}
/// <summary>
/// Low level method for adding input sequence to the playback
/// Usually, this will be called only from extension methods of Playback
/// </summary>
/// <typeparam name="TInput">Universal type that can can contain events of different actual (static) types</typeparam>
/// <param name="createInput">How to create the input observalbe</param>
/// <param name="typeMaps">The available type map types</param>
void IPlaybackConfiguration.AddInput<TInput>(
Expression<Func<IObservable<TInput>>> createInput,
params Type[] typeMaps)
{
var mapInstances = new ITypeMap<TInput>[typeMaps.Length];
for (int i = 0; i < typeMaps.Length; i++)
{
object o = Activator.CreateInstance(typeMaps[i]);
if (o == null)
throw new Exception("Activator.CreateInstance failed for type " + typeMaps[i].Name);
ITypeMap<TInput> mapInstance = o as ITypeMap<TInput>;
if (mapInstance == null)
throw new Exception("The type " + typeMaps[i].FullName + " must implement one of these interfaces :"
+ typeof(ITypeMap<>).Name + ", "
+ typeof(IRootTypeMap<,>).Name + ", "
+ typeof(IPartitionableTypeMap<,>).Name);
mapInstances[i] = mapInstance;
}
var input = new InputStream<TInput>(createInput, StartTime, EndTime, mapInstances);
_inputs.Add(input);
}
/// <summary>
/// Low level method for adding input sequence to the playback
/// Usually, this will be called only from extension methods of Playback
/// </summary>
/// <typeparam name="TInput">Universal type that can can contain events of different actual (static) types</typeparam>
/// <param name="createInput">How to create the input observalbe</param>
/// <param name="typeMaps">The available type maps (local instances)</param>
void IPlaybackConfiguration.AddInput<TInput>(
Expression<Func<IObservable<TInput>>> createInput,
params ITypeMap<TInput>[] typeMaps)
{
var input = new InputStream<TInput>(createInput, StartTime, EndTime, typeMaps);
_inputs.Add(input);
}
/// <summary>
/// Scheduler that represents virtual time as per the timestamps on the events
/// Use playback.Scheduler as argument to temporal primitives like Window or Take
/// </summary>
public IScheduler Scheduler
{
get { return _timeSource.Scheduler; }
}
public IObservable<Timestamped<object>> GetAll(params Type[] types)
{
foreach (IInputStream i in _inputs)
{
foreach (Type t in types)
{
i.AddKnownType(t);
}
}
return _timeSource;
}
/// <summary>
/// Starts the playback and returns immediately
/// The main use case is real-time feeds.
/// </summary>
public void Start()
{
foreach (IInputStream i in _inputs)
{
i.StartTime = StartTime;
i.EndTime = EndTime;
if (KnownTypes == null)
continue;
foreach (Type t in KnownTypes)
{
i.AddKnownType(t);
}
}
if (_inputs.Count == 0)
throw new Exception("No input sequences were added to the Playback");
if (_inputs.Count > 1)
{
IEnumerator<Timestamped<object>>[] queues = (from i in _inputs select i.Output).ToArray();
_mergesort = new PullMergeSort<Timestamped<object>>(e => e.Timestamp.DateTime, queues);
_timeSource.Connect();
_pumpStart = new ManualResetEvent(false);
_pump = new OutputPump<Timestamped<object>>(_mergesort, _subject, _pumpStart);
_pumpStart.Set();
_stopwatch.Start();
foreach (IInputStream i in _inputs)
{
i.Start();
}
}
else
{
_timeSource.Connect();
IInputStream singleInput = _inputs[0];
_stopwatch.Start();
singleInput.Start(_subject);
}
}
/// <summary>
/// Starts the playback, and blocks until rocessing of input is completed
/// </summary>
public void Run()
{
Start();
_timeSource.Completed.WaitOne();
_stopwatch.Stop();
}
/// <summary>
/// BufferOutput lets you accumulate a small collection that is the result of stream processing
/// </summary>
/// <typeparam name="TOutput">The event type of interest</typeparam>
/// <param name="observable">the results to accumulate</param>
/// <returns></returns>
public IEnumerable<TOutput> BufferOutput<TOutput>(IObservable<TOutput> observable)
{
var list = new List<TOutput>();
IDisposable d = observable.Subscribe(list.Add);
_outputBuffers.Add(d);
return list;
}
~PlaybackBase()
{
Dispose();
}
protected interface IInputStream
{
IEnumerator<Timestamped<object>> Output { get; }
void AddKnownType(Type t);
void Start();
void Start(IObserver<Timestamped<object>> observer);
DateTime StartTime { get; set; }
DateTime EndTime { get; set; }
}
protected class InputStream<TInput> : IInputStream, IDisposable
{
private readonly BufferQueue<Timestamped<object>> _output;
private readonly IObservable<TInput> _source;
private IDisposable _subscription;
private CompositeDeserializer<TInput> _deserializer;
public InputStream(
Expression<Func<IObservable<TInput>>> createInput,
DateTime startTime,
DateTime endTime,
params ITypeMap<TInput>[] typeMaps)
{
_source = createInput.Compile()();
_output = new BufferQueue<Timestamped<object>>();
_deserializer = new CompositeDeserializer<TInput>(_output, typeMaps);
}
public DateTime StartTime
{
get { return _deserializer.StartTime; }
set { _deserializer.StartTime = value; }
}
public DateTime EndTime
{
get { return _deserializer.EndTime; }
set { _deserializer.EndTime = value; }
}
public void Dispose()
{
if (_subscription != null)
{
_subscription.Dispose();
}
((IDisposable) _output).Dispose();
}
public void AddKnownType(Type t)
{
((IDeserializer) _deserializer).AddKnownType(t);
}
public IEnumerator<Timestamped<object>> Output
{
get { return _output; }
}
public void Start()
{
_subscription = _source.Subscribe(_deserializer);
}
public void Start(IObserver<Timestamped<object>> observer)
{
_deserializer.SetOutput(observer);
_subscription = _source.Subscribe(_deserializer);
}
}
}
public class Playback : PlaybackBase, IPlayback
{
/// <summary>
/// Call this to get just the events of given type
/// </summary>
/// <typeparam name="TOutput">The type of interest</typeparam>
/// <returns>Sequence of events of type TOutput from all inputs added to the playback</returns>
public IObservable<TOutput> GetObservable<TOutput>()
{
foreach (IInputStream i in _inputs)
{
i.AddKnownType(typeof(TOutput));
}
return _demux.GetObservable<TOutput>();
}
}
}