in csharp/AdapterTest/DStreamTest.cs [274:329]
public void TestDStreamUpdateStateByKey()
{
var ssc = new StreamingContext(new SparkContext("", ""), 1000L);
Assert.IsNotNull((ssc.streamingContextProxy as MockStreamingContextProxy));
var lines = ssc.TextFileStream(Path.GetTempPath());
Assert.IsNotNull(lines.DStreamProxy);
var words = lines.FlatMap(l => l.Split(' '));
var pairs = words.Map(w => new Tuple<string, int>(w, 1));
var doubleCounts = pairs.GroupByKey().FlatMapValues(vs => vs).MapValues(v => 2 * v).ReduceByKey((x, y) => x + y);
doubleCounts.ForeachRDD((time, rdd) =>
{
var taken = rdd.Collect();
Assert.AreEqual(taken.Length, 9);
foreach (object record in taken)
{
Tuple<string, int> countByWord = (Tuple<string, int>)record;
Assert.AreEqual(countByWord.Item1 == "The" || countByWord.Item1 == "dog" || countByWord.Item1 == "lazy" ? 2 * 23 : 2 * 22, countByWord.Item2);
}
});
// disable pipeline to UpdateStateByKey which replys on checkpoint mock proxy doesn't support
pairs.Cache();
var initialStateRdd = ssc.SparkContext.Parallelize(new[] { "AAA" }).Map( w => new Tuple<string, int>("AAA", 22));
var state = pairs.UpdateStateByKey<string, int, int>((v, s) => s + (v as List<int>).Count, initialStateRdd);
state.ForeachRDD((time, rdd) =>
{
var taken = rdd.Collect();
Assert.AreEqual(taken.Length, 10);
foreach (object record in taken)
{
Tuple<string, int> countByWord = (Tuple<string, int>)record;
Assert.AreEqual(countByWord.Item1 == "The" || countByWord.Item1 == "dog" || countByWord.Item1 == "lazy" ? 23 : 22, countByWord.Item2);
}
});
// test when initialStateRdd is not provided
var state2 = pairs.UpdateStateByKey<string, int, int>((v, s) => s + (v as List<int>).Count);
state2.ForeachRDD((time, rdd) =>
{
var taken = rdd.Collect();
Assert.AreEqual(taken.Length, 9);
foreach (object record in taken)
{
Tuple<string, int> countByWord = (Tuple<string, int>)record;
Assert.AreEqual(countByWord.Item1 == "The" || countByWord.Item1 == "dog" || countByWord.Item1 == "lazy" ? 23 : 22, countByWord.Item2);
}
});
}