Source/Tx.Core/Internal/BufferQueue.cs (62 lines of code) (raw):

// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; namespace System.Reactive { internal sealed class BufferQueue<T> : IObserver<T>, IEnumerator<T> { private readonly BlockingCollection<T> _queue = new BlockingCollection<T>(); private T _current; private Exception _error; public T Current { get { return _current; } } object IEnumerator.Current { get { return _current; } } public bool MoveNext() { if (_error != null) throw _error; // Bart, I tried with TryTake, and it sometimes returns immediately on empty queue (so tests fail) try { _current = _queue.Take(); return true; } catch (InvalidOperationException) { if (_error != null) throw _error; return false; } } public void Reset() { throw new NotImplementedException(); } void IDisposable.Dispose() { _queue.Dispose(); } public void OnCompleted() { _queue.CompleteAdding(); } public void OnError(Exception error) { _error = error; _queue.CompleteAdding(); } /// <summary> /// Provides the observer with new data. /// </summary> /// <param name="value">The current notification information.</param> public void OnNext(T value) { _queue.Add(value); } public void Dispose() { _queue.Dispose(); } } }