public void TestDStreamTransform()

in csharp/AdapterTest/DStreamTest.cs [84:137]


        public void TestDStreamTransform()
        {
            var ssc = new StreamingContext(new SparkContext("", ""), 1000L);
            Assert.IsNotNull((ssc.streamingContextProxy as MockStreamingContextProxy));

            var lines = ssc.TextFileStream(Path.GetTempPath());
            Assert.IsNotNull(lines.DStreamProxy);

            var words = lines.FlatMap(l => l.Split(' '));

            var pairs = words.Map(w => new Tuple<string, int>(w, 1));

            var wordCounts = pairs.PartitionBy().ReduceByKey((x, y) => x + y);

            wordCounts.ForeachRDD((time, rdd) => 
                {
                    var taken = rdd.Collect();
                    Assert.AreEqual(taken.Length, 9);

                    foreach (object record in taken)
                    {
                        Tuple<string, int> countByWord = (Tuple<string, int>)record;
                        Assert.AreEqual(countByWord.Item2, countByWord.Item1 == "The" || countByWord.Item1 == "dog" || countByWord.Item1 == "lazy" ? 23 : 22);
                    }
                });

            var wordLists = pairs.GroupByKey();

            wordLists.ForeachRDD((time, rdd) =>
            {
                var taken = rdd.Collect();
                Assert.AreEqual(taken.Length, 9);

                foreach (object record in taken)
                {
                    Tuple<string, List<int>> countByWord = (Tuple<string, List<int>>)record;
                    Assert.AreEqual(countByWord.Item2.Count, countByWord.Item1 == "The" || countByWord.Item1 == "dog" || countByWord.Item1 == "lazy" ? 23 : 22);
                }
            });

            var wordCountsByWindow = pairs.ReduceByKeyAndWindow((x, y) => x + y, (x, y) => x - y, 1);

            wordCountsByWindow.ForeachRDD((time, rdd) =>
            {
                var taken = rdd.Collect();
                Assert.AreEqual(taken.Length, 9);

                foreach (object record in taken)
                {
                    Tuple<string, int> countByWord = (Tuple<string, int>)record;
                    Assert.AreEqual(countByWord.Item2, countByWord.Item1 == "The" || countByWord.Item1 == "dog" || countByWord.Item1 == "lazy" ? 46 : 44);
                }
            });
        }