Source/Tx.Core/Internal/Pump.cs (63 lines of code) (raw):
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace System.Reactive
{
internal sealed class OutputPump<T> : IDisposable
{
private readonly IEnumerator<T> _source;
private readonly IObserver<T> _target;
private readonly Task _thread;
private readonly WaitHandle _waitStart;
private long _eventsRead;
public OutputPump(IEnumerable<T> source, IObserver<T> target, WaitHandle waitStart)
{
_source = source.GetEnumerator();
_target = target;
_waitStart = waitStart;
_thread = Task.Run((Action)ThreadProc);
}
public void Dispose()
{
_waitStart.Dispose();
}
private void ThreadProc()
{
_waitStart.WaitOne();
while (true)
{
try
{
if (!_source.MoveNext())
break;
}
catch (ObjectDisposedException)
{
break;
}
catch (Exception ex)
{
try
{
_target.OnError(ex);
}
catch
{
}
break;
}
_eventsRead++;
try
{
_target.OnNext(_source.Current);
}
catch (Exception ex)
{
_target.OnError(ex);
}
}
_target.OnCompleted();
}
}
}