in csharp/AdapterTest/DStreamTest.cs [84:137]
public void TestDStreamTransform()
{
var ssc = new StreamingContext(new SparkContext("", ""), 1000L);
Assert.IsNotNull((ssc.streamingContextProxy as MockStreamingContextProxy));
var lines = ssc.TextFileStream(Path.GetTempPath());
Assert.IsNotNull(lines.DStreamProxy);
var words = lines.FlatMap(l => l.Split(' '));
var pairs = words.Map(w => new Tuple<string, int>(w, 1));
var wordCounts = pairs.PartitionBy().ReduceByKey((x, y) => x + y);
wordCounts.ForeachRDD((time, rdd) =>
{
var taken = rdd.Collect();
Assert.AreEqual(taken.Length, 9);
foreach (object record in taken)
{
Tuple<string, int> countByWord = (Tuple<string, int>)record;
Assert.AreEqual(countByWord.Item2, countByWord.Item1 == "The" || countByWord.Item1 == "dog" || countByWord.Item1 == "lazy" ? 23 : 22);
}
});
var wordLists = pairs.GroupByKey();
wordLists.ForeachRDD((time, rdd) =>
{
var taken = rdd.Collect();
Assert.AreEqual(taken.Length, 9);
foreach (object record in taken)
{
Tuple<string, List<int>> countByWord = (Tuple<string, List<int>>)record;
Assert.AreEqual(countByWord.Item2.Count, countByWord.Item1 == "The" || countByWord.Item1 == "dog" || countByWord.Item1 == "lazy" ? 23 : 22);
}
});
var wordCountsByWindow = pairs.ReduceByKeyAndWindow((x, y) => x + y, (x, y) => x - y, 1);
wordCountsByWindow.ForeachRDD((time, rdd) =>
{
var taken = rdd.Collect();
Assert.AreEqual(taken.Length, 9);
foreach (object record in taken)
{
Tuple<string, int> countByWord = (Tuple<string, int>)record;
Assert.AreEqual(countByWord.Item2, countByWord.Item1 == "The" || countByWord.Item1 == "dog" || countByWord.Item1 == "lazy" ? 46 : 44);
}
});
}