Source/Tx.Core/Csv/CsvObservable.cs (109 lines of code) (raw):

// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. namespace System.Reactive { using System; using System.Collections.Generic; using System.Collections.ObjectModel; using System.IO; using System.Linq; using System.Reactive.Concurrency; using System.Reactive.Linq; using System.Text; public sealed class CsvObservable { private readonly char _columnSeparator; private readonly int _numberRecordsToSkip; public CsvObservable() : this(',', 1) { } public CsvObservable(char columnSeparator, int numberRecordsToSkip) { this._columnSeparator = columnSeparator; this._numberRecordsToSkip = numberRecordsToSkip; } public IObservable<Record> FromFiles(params string[] files) { return files.SelectMany(this.ReadRecords) .ToObservable(Scheduler.Default); } private IEnumerable<Record> ReadRecords(string fileName) { var stringBuilder = new StringBuilder(); using(var stream = File.OpenRead(fileName)) using (var reader = new StreamReader(stream)) { ReadOnlyCollection<string> header; if (reader.Peek() >= 0) { var first = this.ParseLine(reader.ReadLine(), stringBuilder).ToArray(); header = new ReadOnlyCollection<string>(first); } else { yield break; } for (var i = 0; i < this._numberRecordsToSkip && reader.Peek() >= 0; i++) { reader.ReadLine(); } while (reader.Peek() >= 0) { var items = this.ParseLine(reader.ReadLine(), stringBuilder).ToArray(); yield return new Record(header, items); } } } private IEnumerable<string> ParseLine(string input, StringBuilder stringBuilder) { if (string.IsNullOrEmpty(input)) { yield break; } stringBuilder.Clear(); int index = 0; int escapeCount = 0; for (; index < input.Length; index++) { if (input[index] == '"') { escapeCount++; stringBuilder.Append('"'); } else if (input[index] == this._columnSeparator) { if ((escapeCount % 2) == 0) { if (escapeCount == 0) { yield return stringBuilder .ToString(); } else { yield return stringBuilder .Extract('"') .Replace(@"""""", @""""); } stringBuilder.Clear(); escapeCount = 0; } else { stringBuilder.Append(this._columnSeparator); } } else { stringBuilder.Append(input[index]); } } if (escapeCount == 0) { yield return stringBuilder .ToString(); } else { yield return stringBuilder .Extract('"') .Replace(@"""""", @""""); } } } internal static class StringBuilderExtensions { public static string Extract(this StringBuilder input, char character) { var startIndex = input.IndexOf(character); var lastIndex = input.LastIndexOf(character); var result = input.ToString( startIndex + 1, lastIndex - startIndex - 1); return result; } public static int LastIndexOf(this StringBuilder input, char character) { for (int i = input.Length - 1; i >= 0; i--) { if (input[i] == character) { return i; } } return -1; } public static int IndexOf(this StringBuilder input, char character) { for (int i = 0; i < input.Length; i++) { if (input[i] == character) { return i; } } return -1; } } public sealed class Record { public ReadOnlyCollection<string> Header { get; set; } public string[] Items { get; set; } public Record() { } public Record(ReadOnlyCollection<string> header, string[] items) { this.Header = header; this.Items = items; } } }