Source/Tx.Core/TimeSource.cs (200 lines of code) (raw):
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Subjects;
using System.Threading;
namespace System.Reactive
{
public interface ITimeSource
{
IScheduler Scheduler { get; }
}
/// <summary>
/// TimeSource constructs an "Virtual Time" scheduler based on expression over the event data
/// </summary>
/// <typeparam name="T">Type of the events in the sequence</typeparam>
public class TimeSource<T> : IConnectableObservable<T>, ITimeSource, IDisposable
{
private readonly TimeSegmentScheduler _scheduler;
private readonly IObservable<T> _source;
private readonly Subject<T> _subject;
private readonly Func<T, DateTimeOffset> _timeFunction;
private readonly ManualResetEvent _completed;
/// <summary>
/// Initializes a new instance of the <see cref="TimeSource{T}"/> class.
/// </summary>
/// <param name="source">The event sequence to use as source</param>
/// <param name="timeFunction">Expression to extract the timestamp</param>
public TimeSource(
IObservable<T> source,
Func<T, DateTimeOffset> timeFunction)
{
if (timeFunction == null)
throw new ArgumentNullException(nameof(timeFunction));
_source = source;
_scheduler = new TimeSegmentScheduler();
_timeFunction = timeFunction;
_subject = new Subject<T>();
_completed = new ManualResetEvent(false);
}
public DateTimeOffset StartTime
{
get { return _scheduler.Now; }
set
{
if (!_scheduler._running)
_scheduler.Init(value);
else
_scheduler.AdvanceTo(value);
}
}
public WaitHandle Completed
{
get { return _completed; }
}
public IDisposable Subscribe(IObserver<T> observer)
{
return _subject.Subscribe(observer);
}
public IDisposable Connect()
{
return _source.Subscribe(OnNext, OnError, OnCompleted);
}
public IScheduler Scheduler
{
get { return _scheduler; }
}
private void OnCompleted()
{
_subject.OnCompleted();
_scheduler.Stop();
_completed.Set();
}
private void OnError(Exception error)
{
_subject.OnError(error);
_completed.Set();
}
private void OnNext(T value)
{
DateTimeOffset time = _timeFunction(value);
if (!_scheduler._running)
_scheduler.Init(time);
else if (time > _scheduler.Now)
{
_scheduler.AdvanceTo(time);
}
_subject.OnNext(value);
}
public void Dispose()
{
if (_completed != null) _completed.Dispose();
}
private sealed class TimeSegmentScheduler : IScheduler
{
private readonly HistoricalScheduler _historical = new HistoricalScheduler();
private readonly List<IPostponedWorkItem> _postponed;
public bool _running;
public TimeSegmentScheduler()
{
_postponed = new List<IPostponedWorkItem>();
}
public void Stop()
{
_historical.AdvanceBy(TimeSpan.FromTicks(1));
}
public DateTimeOffset Now
{
get
{
if (!_running)
{
throw new NotImplementedException();
}
return _historical.Clock;
}
}
public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime,
Func<IScheduler, TState, IDisposable> action)
{
if (_running)
{
return _historical.ScheduleAbsolute(state, dueTime, action);
}
return new PostponedWorkItem<TState>(this, state, dueTime, action);
}
public IDisposable Schedule<TState>(TState state, TimeSpan relativeTime,
Func<IScheduler, TState, IDisposable> action)
{
if (_running)
{
return _historical.ScheduleRelative(state, relativeTime, action);
}
return new PostponedWorkItem<TState>(this, state, relativeTime, action);
}
public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
{
return Schedule(state, TimeSpan.Zero, action);
}
public void Init(DateTimeOffset startTime)
{
_historical.AdvanceTo(startTime);
_running = true;
foreach (IPostponedWorkItem item in _postponed)
{
item.Reschedule(_historical);
}
}
public void AdvanceTo(DateTimeOffset value)
{
_historical.AdvanceTo(value);
}
private interface IPostponedWorkItem
{
void Reschedule(HistoricalScheduler historical);
}
private class PostponedWorkItem<TState> : IPostponedWorkItem, IDisposable
{
private readonly Func<IScheduler, TState, IDisposable> _action;
private readonly TimeSegmentScheduler _parent;
private readonly TimeSpan _relativeTime; // relative to start of playback
private readonly TState _state;
private IDisposable _disposable;
private DateTimeOffset? _dueTime; // only set one of _dueTime or _relativeTime
public PostponedWorkItem(TimeSegmentScheduler parent, TState state, DateTimeOffset dueTime,
Func<IScheduler, TState, IDisposable> action)
{
_parent = parent;
_state = state;
_dueTime = dueTime;
_action = action;
_parent._postponed.Add(this);
}
public PostponedWorkItem(TimeSegmentScheduler parent, TState state, TimeSpan relativeTime,
Func<IScheduler, TState, IDisposable> action)
{
_parent = parent;
_state = state;
_relativeTime = relativeTime;
_action = action;
_parent._postponed.Add(this);
}
public void Dispose()
{
_parent._postponed.Remove(this);
if (_disposable != null)
_disposable.Dispose();
}
public void Reschedule(HistoricalScheduler historical)
{
if (_dueTime.HasValue)
{
_disposable = historical.Schedule(_state, _dueTime.Value, _action);
}
else
{
if (this._relativeTime > TimeSpan.Zero)
{
this._disposable = historical.Schedule(this._state, this._relativeTime, this._action);
}
else
{
this._disposable = this._action(historical, this._state);
}
}
}
}
}
}
}