ILRepack.IntegrationTests/Helpers/ObservableProcess.cs (120 lines of code) (raw):
using System;
using System.Diagnostics;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reflection;
using System.Text;
namespace ILRepack.IntegrationTests.Helpers
{
// Shameless copy from the ultimate Rx fu master:
// https://github.com/paulcbetts/peasant/blob/master/Peasant/Helpers/ObservableProcess.cs
public class ObservableProcess : IObservable<int>
{
readonly AsyncSubject<int> exit = new AsyncSubject<int>();
readonly object gate = 42;
readonly ReplaySubject<string> output = new ReplaySubject<string>();
readonly Process process;
readonly IObserver<string> input;
public ObservableProcess(ProcessStartInfo startInfo, bool throwOnNonZeroExitCode = true)
{
startInfo.RedirectStandardError = startInfo.RedirectStandardOutput = startInfo.RedirectStandardInput = true;
process = new Process { StartInfo = startInfo, EnableRaisingEvents = true };
process.OutputDataReceived += OnReceived;
process.ErrorDataReceived += OnReceived;
process.Start();
process.BeginOutputReadLine();
process.BeginErrorReadLine();
input = Observer.Create<string>(
x => { process.StandardInput.WriteLine(x); process.StandardInput.Flush(); },
() => { });
Observable.Start(() =>
{
int exitCode;
try
{
process.WaitForExit(60 * 1000);
}
finally
{
// recreate flush logic from System.Diagnostics.Process
WaitUntilEndOfFile("output");
WaitUntilEndOfFile("error");
exitCode = process.ExitCode;
process.OutputDataReceived -= OnReceived;
process.ErrorDataReceived -= OnReceived;
process.Close();
}
output.OnCompleted();
if (exitCode != 0 && throwOnNonZeroExitCode)
{
var error = string.Join("\n", output.ToArray().First());
exit.OnError(new Exception(error));
}
else
{
exit.OnNext(exitCode);
exit.OnCompleted();
}
}, Scheduler.Default);
}
public IObserver<string> Input
{
get { return input; }
}
public IObservable<string> Output
{
get { return output; }
}
public IDisposable Subscribe(IObserver<int> observer)
{
return exit.Subscribe(observer);
}
public void Kill()
{
process.Kill();
}
public int ProcessId
{
get { return process.Id; }
}
void OnReceived(object s, DataReceivedEventArgs e)
{
if (e.Data == null) return;
lock (gate)
{
output.OnNext(ReparseAsciiDataAsUtf8(e.Data));
}
}
void WaitUntilEndOfFile(string field)
{
var fi = process.GetType().GetField(field, BindingFlags.NonPublic | BindingFlags.Instance);
if (fi != null)
{
var sr = fi.GetValue(process);
if (sr != null)
{
var m = sr.GetType().GetMethod("WaitUtilEOF", BindingFlags.NonPublic | BindingFlags.Instance);
m.Invoke(sr, null);
}
}
}
static string ReparseAsciiDataAsUtf8(string input)
{
if (String.IsNullOrEmpty(input)) return input;
var bytes = new byte[input.Length * 2];
int i = 0;
foreach (char c in input)
{
bytes[i] = (byte)(c & 0xFF);
i++;
var msb = (byte)(c & 0xFF00 >> 16);
if (msb > 0)
{
bytes[i] = msb;
i++;
}
}
var ret = Encoding.UTF8.GetString(bytes, 0, i);
return ret;
}
}
}