Source/Tx.Core/PlaybackConfigurationExtensions.cs (134 lines of code) (raw):
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
namespace System.Reactive
{
using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
public static class PlaybackConfigurationExtensions
{
/// <summary>
/// Method for adding input sequence that contains .NET objects to the playbackConfiguration
/// </summary>
/// <param name="playbackConfiguration">The playback instance.</param>
/// <param name="source">The input sequence.</param>
public static void AddInput(this IPlaybackConfiguration playbackConfiguration, IEnumerable<Timestamped<object>> source)
{
if (playbackConfiguration == null)
{
throw new ArgumentNullException(nameof(playbackConfiguration));
}
if (source == null)
{
throw new ArgumentNullException(nameof(source));
}
playbackConfiguration
.AddInput(
() => source.ToObservable(Scheduler.Default),
typeof(PartitionableTypeMap<object>));
}
/// <summary>
/// Method for adding input sequence from CSV files.
/// </summary>
/// <param name="playback">The playback instance.</param>
/// <param name="transformation">Transformation function converting Record to T</param>
/// <param name="timestampSelector">Timestamp selector function.</param>
/// <param name="files">CSV files containing events ordered by timestamp.</param>
[FileParser("Default CSV parser", ".csv")]
public static void AddCsvInput<T>(
this IPlaybackConfiguration playback,
Func<Record, T> transformation,
Func<Record, DateTimeOffset> timestampSelector,
params string[] files)
{
if (playback == null)
{
throw new ArgumentNullException(nameof(playback));
}
if (transformation == null)
{
throw new ArgumentNullException(nameof(transformation));
}
if (timestampSelector == null)
{
throw new ArgumentNullException(nameof(timestampSelector));
}
if (files == null)
{
throw new ArgumentNullException(nameof(files));
}
AddCsvInput(playback, ',', 0, transformation, timestampSelector, files);
}
/// <summary>
/// Method for adding input sequence from custom CSV files.
/// </summary>
/// <param name="playback">The playback instance.</param>
/// <param name="delimiter"></param>
/// <param name="numberRecordsToSkip"></param>
/// <param name="transformation">Transformation function converting Record to T</param>
/// <param name="timestampSelector">Timestamp selector function.</param>
/// <param name="files">CSV files containing events ordered by timestamp.</param>
[FileParser("Custom CSV parser", ".csv", ".tsv", ".txt")]
public static void AddCsvInput<T>(
this IPlaybackConfiguration playback,
char delimiter,
int numberRecordsToSkip,
Func<Record, T> transformation,
Func<Record, DateTimeOffset> timestampSelector,
params string[] files)
{
if (playback == null)
{
throw new ArgumentNullException(nameof(playback));
}
if (transformation == null)
{
throw new ArgumentNullException(nameof(transformation));
}
if (timestampSelector == null)
{
throw new ArgumentNullException(nameof(timestampSelector));
}
if (files == null)
{
throw new ArgumentNullException(nameof(files));
}
playback.AddInput(
() => new CsvObservable(delimiter, numberRecordsToSkip)
.FromFiles(files)
.Select(item => new Timestamped<T>(transformation(item), timestampSelector(item))),
typeof(PartitionableTypeMap<T>));
}
public static IObservable<TOutput> OfType<TInput, TOutput>(
this IObservable<TInput> source,
params ITypeMap<TInput>[] typeMaps)
{
if (source == null)
{
throw new ArgumentNullException(nameof(source));
}
if (typeMaps == null)
{
throw new ArgumentNullException(nameof(typeMaps));
}
return Observable
.Create<Timestamped<object>>(observer =>
{
var deserialzier = new CompositeDeserializer<TInput>(observer, typeMaps);
deserialzier.EndTime = DateTime.MaxValue;
deserialzier.AddKnownType(typeof(TOutput));
return source.SubscribeSafe(deserialzier);
})
.Select(i => i.Value)
.Where(i => i != null)
.OfType<TOutput>();
}
private sealed class PartitionableTypeMap<T> : IPartitionableTypeMap<Timestamped<T>, string>
{
public Func<Timestamped<T>, object> GetTransform(Type outputType)
{
return item => item.Value;
}
public Func<Timestamped<T>, DateTimeOffset> TimeFunction
{
get
{
return item => item.Timestamp;
}
}
public string GetTypeKey(Type outputType)
{
return outputType.FullName;
}
public string GetInputKey(Timestamped<T> evt)
{
return evt.Value.GetType().FullName;
}
public IEqualityComparer<string> Comparer
{
get
{
return StringComparer.Ordinal;
}
}
}
}
}