public void TestDStreamJoin()

in csharp/AdapterTest/DStreamTest.cs [140:244]


        public void TestDStreamJoin()
        {
            var ssc = new StreamingContext(new SparkContext("", ""), 1000L);
            Assert.IsNotNull((ssc.streamingContextProxy as MockStreamingContextProxy));

            var lines = ssc.TextFileStream(Path.GetTempPath());
            Assert.IsNotNull(lines.DStreamProxy);

            var words = lines.FlatMap(l => l.Split(' '));

            var pairs = words.Map(w => new Tuple<string, int>(w, 1));

            var wordCounts = pairs.ReduceByKey((x, y) => x + y);

            var left = wordCounts.Filter(x => x.Item1 != "quick" && x.Item1 != "lazy");
            var right = wordCounts.Filter(x => x.Item1 != "brown");

            var groupWith = left.GroupWith(right);
            groupWith.ForeachRDD((time, rdd) =>
            {
                var taken = rdd.Collect();
                Assert.AreEqual(taken.Length, 9);

                foreach (object record in taken)
                {
                    Tuple<string, Tuple<List<int>, List<int>>> countByWord = (Tuple<string, Tuple<List<int>, List<int>>>)record;
                    if (countByWord.Item1 == "quick" || countByWord.Item1 == "lazy")
                        Assert.AreEqual(countByWord.Item2.Item1.Count, 0);
                    else if (countByWord.Item1 == "brown")
                        Assert.AreEqual(countByWord.Item2.Item2.Count, 0);
                    else
                    {
                        Assert.AreEqual(countByWord.Item2.Item1[0], countByWord.Item1 == "The" || countByWord.Item1 == "dog" ? 23 : 22);
                        Assert.AreEqual(countByWord.Item2.Item2[0], countByWord.Item1 == "The" || countByWord.Item1 == "dog" || countByWord.Item1 == "lazy" ? 23 : 22);
                    }
                }
            });

            var innerJoin = left.Join(right);
            innerJoin.ForeachRDD((time, rdd) =>
            {
                var taken = rdd.Collect();
                Assert.AreEqual(taken.Length, 6);

                foreach (object record in taken)
                {
                    Tuple<string, Tuple<int, int>> countByWord = (Tuple<string, Tuple<int, int>>)record;
                    Assert.AreEqual(countByWord.Item2.Item1, countByWord.Item1 == "The" || countByWord.Item1 == "dog" ? 23 : 22);
                    Assert.AreEqual(countByWord.Item2.Item2, countByWord.Item1 == "The" || countByWord.Item1 == "dog" ? 23 : 22);
                }
            });

            var leftOuterJoin = left.LeftOuterJoin(right);
            leftOuterJoin.ForeachRDD((time, rdd) =>
            {
                var taken = rdd.Collect();
                Assert.AreEqual(taken.Length, 7);

                foreach (object record in taken)
                {
                    Tuple<string, Tuple<int, Option<int>>> countByWord = (Tuple<string, Tuple<int, Option<int>>>)record;
                    Assert.AreEqual(countByWord.Item2.Item1, countByWord.Item1 == "The" || countByWord.Item1 == "dog" ? 23 : 22);
                    Assert.IsTrue(countByWord.Item1 == "The" || countByWord.Item1 == "dog" ? 
                        countByWord.Item2.Item2.IsDefined == true && countByWord.Item2.Item2.GetValue() == 23 : (countByWord.Item1 == "brown" ?
                        countByWord.Item2.Item2.IsDefined == true == false : countByWord.Item2.Item2.IsDefined == true && countByWord.Item2.Item2.GetValue() == 22));
                }
            });

            var rightOuterJoin = left.RightOuterJoin(right);
            rightOuterJoin.ForeachRDD(rdd =>
            {
                var taken = rdd.Collect();
                Assert.AreEqual(taken.Length, 8);

                foreach (object record in taken)
                {
                    Tuple<string, Tuple<Option<int>, int>> countByWord = (Tuple<string, Tuple<Option<int>, int>>)record;
                    Assert.IsTrue(countByWord.Item1 == "The" || countByWord.Item1 == "dog" ? 
                        countByWord.Item2.Item1.IsDefined == true && countByWord.Item2.Item1.GetValue() == 23 : 
                        (countByWord.Item1 == "quick" || countByWord.Item1 == "lazy" ? countByWord.Item2.Item1.IsDefined == false :
                        countByWord.Item2.Item1.IsDefined == true && countByWord.Item2.Item1.GetValue() == 22));
                    Assert.AreEqual(countByWord.Item2.Item2, countByWord.Item1 == "The" || countByWord.Item1 == "dog" || countByWord.Item1 == "lazy" ? 23 : 22);
                }
            });
            
            var fullOuterJoin = left.FullOuterJoin(right);
            fullOuterJoin.ForeachRDD(rdd =>
            {
                var taken = rdd.Collect();
                Assert.AreEqual(taken.Length, 9);

                foreach (object record in taken)
                {
                    Tuple<string, Tuple<Option<int>, Option<int>>> countByWord = (Tuple<string, Tuple<Option<int>, Option<int>>>)record;
                    Assert.IsTrue(countByWord.Item1 == "The" || countByWord.Item1 == "dog" ?
                        countByWord.Item2.Item1.IsDefined == true && countByWord.Item2.Item1.GetValue() == 23 :
                        (countByWord.Item1 == "quick" || countByWord.Item1 == "lazy" ? countByWord.Item2.Item1.IsDefined == false :
                        countByWord.Item2.Item1.IsDefined == true && countByWord.Item2.Item1.GetValue() == 22));

                    Assert.IsTrue(countByWord.Item1 == "The" || countByWord.Item1 == "dog" || countByWord.Item1 == "lazy" ? 
                        countByWord.Item2.Item2.IsDefined == true && countByWord.Item2.Item2.GetValue() == 23 : 
                        (countByWord.Item1 == "brown" ? countByWord.Item2.Item2.IsDefined == false : countByWord.Item2.Item2.IsDefined == true && countByWord.Item2.Item2.GetValue() == 22));
                }
            });
        }