in csharp/AdapterTest/TestWithMoqDemo.cs [37:137]
public void TestInitialize()
{
result = null;
// Create Mock object to mock implementation of T by new Mock<T>();
_mockSparkCLRProxy = new Mock<ISparkCLRProxy>();
_mockSparkContextProxy = new Mock<ISparkContextProxy>();
_mockStreamingContextProxy = new Mock<IStreamingContextProxy>();
_mockRddProxy = new Mock<IRDDProxy>();
SparkCLREnvironment.SparkCLRProxy = _mockSparkCLRProxy.Object;
// Mock method of T by Mock<T>.Setup(). For method with parameters, you can mock different method implementation for different method parameters.
// e.g., if you want to mock a method regardless of what values the method parameters are, you can use It.IsAny<T>() for each parameter; if you want
// to mock the method for certain criteria, use It.Is<T>(Func<T, bool>) can. You can mock the same method multiple times for different criteria of
// method parameters.
// If the method to mock has return value and you want to mock the return value only, Use Returns(TReturnValue); if you want to add logics and return,
// use Returns<T1, T2, ...>(Func<T1, T2, ..., TReturnValue>). If method is void, use CallBack<T1, T2, ...>(Action<T1, T2, ...>)
// for more info please visit https://github.com/Moq/moq4/wiki/Quickstart
_mockSparkCLRProxy.Setup(m => m.CreateSparkConf(It.IsAny<bool>())).Returns(new MockSparkConfProxy()); // some of mocks which rarely change can be kept
_mockSparkCLRProxy.Setup(m => m.CreateSparkContext(It.IsAny<ISparkConfProxy>())).Returns(_mockSparkContextProxy.Object);
_mockSparkCLRProxy.Setup(m => m.CreateStreamingContext(It.IsAny<SparkContext>(), It.IsAny<long>())).Returns(_mockStreamingContextProxy.Object);
_mockRddProxy.Setup(m => m.CollectAndServe()).Returns(() =>
{
var listener = SocketFactory.CreateSocket();
listener.Listen();
Task.Run(() =>
{
using (var socket = listener.Accept())
using (var ns = socket.GetStream())
{
foreach (var item in result)
{
var ms = new MemoryStream();
new BinaryFormatter().Serialize(ms, item);
byte[] buffer = ms.ToArray();
SerDe.Write(ns, buffer.Length);
SerDe.Write(ns, buffer);
}
ns.Flush();
}
});
return new SocketInfo((listener.LocalEndPoint as IPEndPoint).Port, null);
});
_mockRddProxy.Setup(m => m.RDDCollector).Returns(new RDDCollector());
_mockSparkContextProxy.Setup(m => m.CreateCSharpRdd(It.IsAny<IRDDProxy>(), It.IsAny<byte[]>(), It.IsAny<Dictionary<string, string>>(),
It.IsAny<List<string>>(), It.IsAny<bool>(), It.IsAny<List<Broadcast>>(), It.IsAny<List<byte[]>>()))
.Returns<IRDDProxy, byte[], Dictionary<string, string>, List<string>, bool, List<Broadcast>, List<byte[]>>(
(prefvJavaRddReference, command, environmentVariables, cSharpIncludes, preservePartitioning, broadcastVariables, accumulator) =>
{
IEnumerable<dynamic> input = result ?? (new[] {
"The quick brown fox jumps over the lazy dog The quick brown fox jumps over the lazy dog The quick brown fox jumps over the lazy dog The quick brown fox jumps over the lazy dog",
"The quick brown fox jumps over the lazy dog The quick brown fox jumps over the lazy dog",
"The quick brown fox jumps over the lazy dog The quick brown fox jumps over the lazy dog The quick brown fox jumps over the lazy dog",
"The quick brown fox jumps over the lazy dog The quick brown fox jumps over the lazy dog The quick brown fox jumps over the lazy dog The quick brown fox jumps over the lazy dog The quick brown fox jumps over the lazy dog",
"The quick brown fox jumps over the lazy dog The quick brown fox jumps over the lazy dog The quick brown fox jumps over the lazy dog",
"The quick brown fox jumps over the lazy dog",
"The quick brown fox jumps over the lazy dog",
"The quick brown fox jumps over the lazy dog",
"The quick brown fox jumps over the lazy dog",
"The quick brown fox jumps over the lazy dog",
"The dog lazy"
}).AsEnumerable();
var formatter = new BinaryFormatter();
using (MemoryStream s = new MemoryStream(command))
{
int rddId = SerDe.ReadInt(s);
int stageId = SerDe.ReadInt(s);
int partitionId = SerDe.ReadInt(s);
SerDe.ReadString(s);
SerDe.ReadString(s);
string runMode = SerDe.ReadString(s);
if ("R".Equals(runMode, StringComparison.InvariantCultureIgnoreCase))
{
string compilationDumpDir = SerDe.ReadString(s);
}
CSharpWorkerFunc workerFunc = (CSharpWorkerFunc)formatter.Deserialize(new MemoryStream(SerDe.ReadBytes(s)));
var func = workerFunc.Func;
result = func(default(int), input);
}
if (result.FirstOrDefault() is byte[] && (result.First() as byte[]).Length == 8)
{
result = result.Where(e => (e as byte[]).Length != 8).Select(e => formatter.Deserialize(new MemoryStream(e as byte[])));
}
return _mockRddProxy.Object;
});
_streamingContext = new StreamingContext(new SparkContext("", ""), 1000L);
}