public void TestDStreamUpdateStateByKey()

in csharp/AdapterTest/DStreamTest.cs [274:329]


        public void TestDStreamUpdateStateByKey()
        {
            var ssc = new StreamingContext(new SparkContext("", ""), 1000L);
            Assert.IsNotNull((ssc.streamingContextProxy as MockStreamingContextProxy));

            var lines = ssc.TextFileStream(Path.GetTempPath());
            Assert.IsNotNull(lines.DStreamProxy);

            var words = lines.FlatMap(l => l.Split(' '));

            var pairs = words.Map(w => new Tuple<string, int>(w, 1));

            var doubleCounts = pairs.GroupByKey().FlatMapValues(vs => vs).MapValues(v => 2 * v).ReduceByKey((x, y) => x + y);
            doubleCounts.ForeachRDD((time, rdd) =>
            {
                var taken = rdd.Collect();
                Assert.AreEqual(taken.Length, 9);

                foreach (object record in taken)
                {
                    Tuple<string, int> countByWord = (Tuple<string, int>)record;
                    Assert.AreEqual(countByWord.Item1 == "The" || countByWord.Item1 == "dog" || countByWord.Item1 == "lazy" ? 2 * 23 : 2 * 22, countByWord.Item2);
                }
            });

            // disable pipeline to UpdateStateByKey which replys on checkpoint mock proxy doesn't support
            pairs.Cache();

            var initialStateRdd = ssc.SparkContext.Parallelize(new[] { "AAA" }).Map( w => new Tuple<string, int>("AAA", 22));
            var state = pairs.UpdateStateByKey<string, int, int>((v, s) => s + (v as List<int>).Count, initialStateRdd);
            state.ForeachRDD((time, rdd) =>
            {
                var taken = rdd.Collect();
                Assert.AreEqual(taken.Length, 10);

                foreach (object record in taken)
                {
                    Tuple<string, int> countByWord = (Tuple<string, int>)record;
                    Assert.AreEqual(countByWord.Item1 == "The" || countByWord.Item1 == "dog" || countByWord.Item1 == "lazy" ? 23 : 22, countByWord.Item2);
                }
            });

            // test when initialStateRdd is not provided
            var state2 = pairs.UpdateStateByKey<string, int, int>((v, s) => s + (v as List<int>).Count);
            state2.ForeachRDD((time, rdd) =>
            {
                var taken = rdd.Collect();
                Assert.AreEqual(taken.Length, 9);

                foreach (object record in taken)
                {                                                                                                                                          
                    Tuple<string, int> countByWord = (Tuple<string, int>)record;
                    Assert.AreEqual(countByWord.Item1 == "The" || countByWord.Item1 == "dog" || countByWord.Item1 == "lazy" ? 23 : 22, countByWord.Item2); 
                }
            });
        }