Source/Tx.Core/Csv/CsvExtensions.cs (112 lines of code) (raw):

// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. using System.Collections; using System.IO; using System.Reflection; namespace System.Reactive { public static class CsvExtensions { public static IDisposable ToCsvFile<T>(this IObservable<T> source, string filePath) { return source.Subscribe(new TextFileWriter<T>(", ", filePath)); } public static IDisposable ToTsvFile<T>(this IObservable<T> source, string filePath) { return source.Subscribe(new TextFileWriter<T>("\t", filePath)); } internal sealed class TextFileWriter<T> : IObserver<T>, IDisposable { private readonly string _separator; private StreamWriter _writer; private bool _wroteHeader = false; public TextFileWriter(string separator, string filePath) { _separator = separator; _writer = File.CreateText(filePath); } /// <summary> /// Notifies the observer that the provider has finished sending push-based notifications. /// </summary> public void OnCompleted() { _writer.Dispose(); } /// <summary> /// Notifies the observer that the provider has experienced an error condition. /// </summary> /// <param name="error">An object that provides additional information about the error.</param> public void OnError(Exception error) { throw error; } /// <summary> /// Provides the observer with new data. /// </summary> /// <param name="value">The current notification information.</param> public void OnNext(T value) { if (!_wroteHeader) { WriteHeader(value); _wroteHeader = true; } bool isFirst = true; foreach (var p in typeof(T).GetTypeInfo().DeclaredProperties) { if (isFirst) isFirst = false; else _writer.Write(_separator); var propValue = p.GetValue(value, new object[] { }); if (propValue == null) continue; IDictionary dictionary = propValue as IDictionary; if (dictionary == null) _writer.Write(propValue.ToString()); else WriteValuesAsRow(dictionary); } _writer.WriteLine(); } public void Dispose() { if (_writer != null) { _writer.Dispose(); _writer = null; } } void WriteHeader(T firstValue) { bool isFirst = true; foreach (var p in typeof(T).GetTypeInfo().DeclaredProperties) { if (isFirst) isFirst = false; else _writer.Write(_separator); var propValue = p.GetValue(firstValue, new object[] { }); ; IDictionary dictionary = propValue as IDictionary; if (dictionary == null) _writer.Write(p.Name); else WriteKeysAsColumnTitles(dictionary); } _writer.WriteLine(); } void WriteKeysAsColumnTitles(IDictionary dictionary) { bool isFirst = true; foreach (var key in dictionary.Keys) { if (isFirst) isFirst = false; else _writer.Write(_separator); _writer.Write(key); } } void WriteValuesAsRow(IDictionary dictionary) { bool isFirst = true; foreach (var value in dictionary.Values) { if (isFirst) isFirst = false; else _writer.Write(_separator); _writer.Write(value.ToString()); } } } } }