Source/Tx.Core/Demultiplexor.cs (109 lines of code) (raw):
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
using System.Reflection;
namespace System.Reactive
{
/// <summary>
/// Efficiently demultiplexes input sequence of objects into output sequences of fixed types
/// The callbacks on the output sequences are called in the order of occurence of input events
/// OnNext of the Demultiplexor should not be called from multiple threads
/// </summary>
public class Demultiplexor : IObserver<object>
{
private readonly Dictionary<Type, IObserver<object>> _outputs = new Dictionary<Type, IObserver<object>>();
private readonly Dictionary<Type, List<Type>> _knownOutputMappings = new Dictionary<Type, List<Type>>();
/// <summary>
/// Notifies the observer that the provider has finished sending push-based notifications.
/// </summary>
public void OnCompleted()
{
foreach (var output in _outputs.Values.ToArray())
{
output.OnCompleted();
}
}
/// <summary>
/// Notifies the observer that the provider has experienced an error condition.
/// </summary>
/// <param name="error">An object that provides additional information about the error.</param>
public void OnError(Exception error)
{
foreach (var output in _outputs.Values)
{
output.OnError(error);
}
}
/// <summary>
/// Provides the observer with new data.
/// </summary>
/// <param name="inputObject">The current notification information.</param>
public void OnNext(object inputObject)
{
var inputObjectType = inputObject.GetType();
List<Type> outputKeys;
_knownOutputMappings.TryGetValue(inputObjectType, out outputKeys);
if (outputKeys == null)
{
outputKeys = new List<Type>();
_knownOutputMappings.Add(inputObjectType, outputKeys);
outputKeys.AddRange(GetTypes(inputObjectType).Where(type => _outputs.ContainsKey(type)));
}
foreach (var keyType in outputKeys)
{
_outputs[keyType].OnNext(inputObject);
}
}
/// <summary>
/// Returns an output sequence of given type
/// </summary>
/// <typeparam name="TOutput">The desired type</typeparam>
/// <returns>Sequence in which all events are of type TOutput</returns>
public IObservable<TOutput> GetObservable<TOutput>()
{
IObserver<object> o;
if (!_outputs.TryGetValue(typeof(TOutput), out o))
{
o = new OutputSubject<TOutput>();
_outputs.Add(typeof(TOutput), o);
RefreshKnownOutputMappings(typeof(TOutput));
}
var output = (IObservable<TOutput>)o;
return output;
}
private static List<Type> GetTypes(Type inputType)
{
var typeList = new List<Type>();
var temp = inputType;
while (temp != typeof(object))
{
typeList.Add(temp);
temp = temp.GetTypeInfo().BaseType;
}
typeList.AddRange(inputType.GetTypeInfo().ImplementedInterfaces);
return typeList;
}
private void RefreshKnownOutputMappings(Type outputType)
{
foreach (var knownMappings in _knownOutputMappings)
{
if (GetTypes(knownMappings.Key).Contains(outputType) && !knownMappings.Value.Contains(outputType))
{
knownMappings.Value.Add(outputType);
}
}
}
private sealed class OutputSubject<T> : ISubject<object, T>, IDisposable
{
private readonly Subject<T> _subject;
private int _refcount;
public OutputSubject()
{
_subject = new Subject<T>();
}
public void Dispose()
{
_refcount--;
//if (_refcount == 0)
//{
// _parent._outputs.Remove(typeof(T));
//}
}
/// <summary>
/// Notifies the observer that the provider has finished sending push-based notifications.
/// </summary>
public void OnCompleted()
{
_subject.OnCompleted();
}
/// <summary>
/// Notifies the observer that the provider has experienced an error condition.
/// </summary>
/// <param name="error">An object that provides additional information about the error.</param>
public void OnError(Exception error)
{
_subject.OnError(error);
}
/// <summary>
/// Provides the observer with new data.
/// </summary>
/// <param name="value">The current notification information.</param>
public void OnNext(object value)
{
_subject.OnNext((T)value);
}
public IDisposable Subscribe(IObserver<T> observer)
{
IDisposable subscription = _subject.Subscribe(observer);
_refcount++;
return new CompositeDisposable(subscription, this);
}
}
}
}