Source/Tx.Core/PullMergeSort.cs (127 lines of code) (raw):

// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. using System.Collections; using System.Collections.Generic; namespace System.Reactive { public class PullMergeSort<T> : IEnumerable<T> { private readonly List<IEnumerator<T>> _inputs; private readonly Func<T, DateTime> _keyFunction; /// <summary> /// Initializes a new instance of the <see cref="PullMergeSort{T}"/> class. /// </summary> /// <param name="keyFunction">Time stamp getter function.</param> /// <param name="inputs">The collection of sequences of <see> /// <cref>T</cref> /// </see> /// elements.</param> public PullMergeSort(Func<T, DateTime> keyFunction, IEnumerable<IEnumerator<T>> inputs) { _keyFunction = keyFunction; _inputs = new List<IEnumerator<T>>(inputs); } public IEnumerator<T> GetEnumerator() { return new Enumerator(this, _inputs); } IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } private class Enumerator : IEnumerator<T> { private readonly List<Reader> _inputs; private readonly PullMergeSort<T> _parent; private T _current; private bool _initialized; public Enumerator(PullMergeSort<T> parent, IEnumerable<IEnumerator<T>> inputs) { _parent = parent; _inputs = new List<Reader>(); foreach (var i in inputs) { _inputs.Add(new Reader(i)); } } public bool MoveNext() { if (!_initialized) { foreach (Reader reader in _inputs) { reader.ReadOne(); } _initialized = true; } Reader streamToRead = FindStreamToRead(); if (streamToRead == null) return false; _current = streamToRead.Next; streamToRead.ReadOne(); return true; } public T Current { get { return _current; } } public void Dispose() { foreach (Reader input in _inputs) { input.Dispose(); } } object IEnumerator.Current { get { return Current; } } public void Reset() { throw new NotImplementedException(); } private Reader FindStreamToRead() { Reader streamToRead = null; DateTime earliestTimestamp = DateTime.MaxValue; var toRemove = new List<Reader>(); foreach (Reader s in _inputs) { if (s.IsCompleted) { toRemove.Add(s); continue; } DateTime timestamp = _parent._keyFunction(s.Next); if (timestamp < earliestTimestamp) { earliestTimestamp = timestamp; streamToRead = s; } } foreach (Reader r in toRemove) { _inputs.Remove(r); r.Dispose(); } return streamToRead; } } private sealed class Reader : IDisposable { private readonly IEnumerator<T> _enumerator; private bool _isCompleted; public Reader(IEnumerator<T> enumerator) { _enumerator = enumerator; } public bool IsCompleted { get { return _isCompleted; } } public T Next { get { return _enumerator.Current; } } public void Dispose() { _enumerator.Dispose(); } public void ReadOne() { _isCompleted = !_enumerator.MoveNext(); } } } }