Source/Tx.Core/CompositeDeserializer.cs (99 lines of code) (raw):

// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. using System.Collections.Generic; using System.Linq; using System.Reflection; namespace System.Reactive { public class CompositeDeserializer<TInput> : IObserver<TInput>, IDeserializer { private readonly List<IDeserializer<TInput>> _deserializers; private IObserver<Timestamped<object>> _observer; public DateTime StartTime { get; set; } public DateTime EndTime { get; set; } public CompositeDeserializer( IObserver<Timestamped<object>> observer, params ITypeMap<TInput>[] typeMaps) { _observer = observer; _deserializers = new List<IDeserializer<TInput>>(); foreach (ITypeMap<TInput> mapInstance in typeMaps) { Type mapInterface = mapInstance.GetType().GetTypeInfo().ImplementedInterfaces .FirstOrDefault(i => i.Name == typeof(IPartitionableTypeMap<,>).Name); if (mapInterface != null) { Type deserializerType = typeof (PartitionKeyDeserializer<,>).MakeGenericType(mapInterface.GenericTypeArguments); object deserializerInstance = Activator.CreateInstance(deserializerType, mapInstance); _deserializers.Add((IDeserializer<TInput>) deserializerInstance); continue; } mapInterface = mapInstance.GetType().GetTypeInfo().ImplementedInterfaces .FirstOrDefault(i => i.Name == typeof(IRootTypeMap<,>).Name); if (mapInterface != null) { Type deserializerType = typeof (RootDeserializer<,>).MakeGenericType(mapInterface.GenericTypeArguments); object deserializerInstance = Activator.CreateInstance(deserializerType, mapInstance); _deserializers.Add((IDeserializer<TInput>) deserializerInstance); continue; } mapInterface = mapInstance.GetType().GetTypeInfo().ImplementedInterfaces .FirstOrDefault(i => i.Name == typeof(ITypeMap<>).Name); if (mapInterface != null) { Type deserializerType = typeof (TransformDeserializer<>).MakeGenericType(mapInterface.GenericTypeArguments); object deserializerInstance = Activator.CreateInstance(deserializerType, mapInstance); _deserializers.Add((IDeserializer<TInput>) deserializerInstance); continue; } throw new Exception("The type " + mapInstance.GetType().FullName + " must implement one of these interfaces :" + typeof (ITypeMap<>).Name + ", " + typeof (IRootTypeMap<,>).Name + ", " + typeof (IPartitionableTypeMap<,>).Name); } } public void AddKnownType(Type type) { foreach (var d in _deserializers) { d.AddKnownType(type); } } public void SetOutput(IObserver<Timestamped<object>> observer) { _observer = observer; } /// <summary> /// Notifies the observer that the provider has finished sending push-based notifications. /// </summary> public void OnCompleted() { _observer.OnCompleted(); } /// <summary> /// Notifies the observer that the provider has experienced an error condition. /// </summary> /// <param name="error">An object that provides additional information about the error.</param> public void OnError(Exception error) { _observer.OnError(error); } /// <summary> /// Provides the observer with new data. /// </summary> /// <param name="value">The current notification information.</param> public void OnNext(TInput value) { DateTimeOffset? timestamp = null; foreach (var d in _deserializers) { Timestamped<object> ts; if (d.TryDeserialize(value, out ts)) { if (timestamp.HasValue && timestamp.Value != ts.Timestamp) { _observer.OnError(new Exception("Several type maps return different timestamps for the same source event.")); return; } timestamp = ts.Timestamp; // TODO: this achieves the right semantics, // but the performance will be sub optimal if (ts.Timestamp.DateTime < StartTime) return; if (ts.Timestamp.DateTime > EndTime) return; _observer.OnNext(ts); } } } } }