in csharp/AdapterTest/DStreamTest.cs [140:244]
public void TestDStreamJoin()
{
var ssc = new StreamingContext(new SparkContext("", ""), 1000L);
Assert.IsNotNull((ssc.streamingContextProxy as MockStreamingContextProxy));
var lines = ssc.TextFileStream(Path.GetTempPath());
Assert.IsNotNull(lines.DStreamProxy);
var words = lines.FlatMap(l => l.Split(' '));
var pairs = words.Map(w => new Tuple<string, int>(w, 1));
var wordCounts = pairs.ReduceByKey((x, y) => x + y);
var left = wordCounts.Filter(x => x.Item1 != "quick" && x.Item1 != "lazy");
var right = wordCounts.Filter(x => x.Item1 != "brown");
var groupWith = left.GroupWith(right);
groupWith.ForeachRDD((time, rdd) =>
{
var taken = rdd.Collect();
Assert.AreEqual(taken.Length, 9);
foreach (object record in taken)
{
Tuple<string, Tuple<List<int>, List<int>>> countByWord = (Tuple<string, Tuple<List<int>, List<int>>>)record;
if (countByWord.Item1 == "quick" || countByWord.Item1 == "lazy")
Assert.AreEqual(countByWord.Item2.Item1.Count, 0);
else if (countByWord.Item1 == "brown")
Assert.AreEqual(countByWord.Item2.Item2.Count, 0);
else
{
Assert.AreEqual(countByWord.Item2.Item1[0], countByWord.Item1 == "The" || countByWord.Item1 == "dog" ? 23 : 22);
Assert.AreEqual(countByWord.Item2.Item2[0], countByWord.Item1 == "The" || countByWord.Item1 == "dog" || countByWord.Item1 == "lazy" ? 23 : 22);
}
}
});
var innerJoin = left.Join(right);
innerJoin.ForeachRDD((time, rdd) =>
{
var taken = rdd.Collect();
Assert.AreEqual(taken.Length, 6);
foreach (object record in taken)
{
Tuple<string, Tuple<int, int>> countByWord = (Tuple<string, Tuple<int, int>>)record;
Assert.AreEqual(countByWord.Item2.Item1, countByWord.Item1 == "The" || countByWord.Item1 == "dog" ? 23 : 22);
Assert.AreEqual(countByWord.Item2.Item2, countByWord.Item1 == "The" || countByWord.Item1 == "dog" ? 23 : 22);
}
});
var leftOuterJoin = left.LeftOuterJoin(right);
leftOuterJoin.ForeachRDD((time, rdd) =>
{
var taken = rdd.Collect();
Assert.AreEqual(taken.Length, 7);
foreach (object record in taken)
{
Tuple<string, Tuple<int, Option<int>>> countByWord = (Tuple<string, Tuple<int, Option<int>>>)record;
Assert.AreEqual(countByWord.Item2.Item1, countByWord.Item1 == "The" || countByWord.Item1 == "dog" ? 23 : 22);
Assert.IsTrue(countByWord.Item1 == "The" || countByWord.Item1 == "dog" ?
countByWord.Item2.Item2.IsDefined == true && countByWord.Item2.Item2.GetValue() == 23 : (countByWord.Item1 == "brown" ?
countByWord.Item2.Item2.IsDefined == true == false : countByWord.Item2.Item2.IsDefined == true && countByWord.Item2.Item2.GetValue() == 22));
}
});
var rightOuterJoin = left.RightOuterJoin(right);
rightOuterJoin.ForeachRDD(rdd =>
{
var taken = rdd.Collect();
Assert.AreEqual(taken.Length, 8);
foreach (object record in taken)
{
Tuple<string, Tuple<Option<int>, int>> countByWord = (Tuple<string, Tuple<Option<int>, int>>)record;
Assert.IsTrue(countByWord.Item1 == "The" || countByWord.Item1 == "dog" ?
countByWord.Item2.Item1.IsDefined == true && countByWord.Item2.Item1.GetValue() == 23 :
(countByWord.Item1 == "quick" || countByWord.Item1 == "lazy" ? countByWord.Item2.Item1.IsDefined == false :
countByWord.Item2.Item1.IsDefined == true && countByWord.Item2.Item1.GetValue() == 22));
Assert.AreEqual(countByWord.Item2.Item2, countByWord.Item1 == "The" || countByWord.Item1 == "dog" || countByWord.Item1 == "lazy" ? 23 : 22);
}
});
var fullOuterJoin = left.FullOuterJoin(right);
fullOuterJoin.ForeachRDD(rdd =>
{
var taken = rdd.Collect();
Assert.AreEqual(taken.Length, 9);
foreach (object record in taken)
{
Tuple<string, Tuple<Option<int>, Option<int>>> countByWord = (Tuple<string, Tuple<Option<int>, Option<int>>>)record;
Assert.IsTrue(countByWord.Item1 == "The" || countByWord.Item1 == "dog" ?
countByWord.Item2.Item1.IsDefined == true && countByWord.Item2.Item1.GetValue() == 23 :
(countByWord.Item1 == "quick" || countByWord.Item1 == "lazy" ? countByWord.Item2.Item1.IsDefined == false :
countByWord.Item2.Item1.IsDefined == true && countByWord.Item2.Item1.GetValue() == 22));
Assert.IsTrue(countByWord.Item1 == "The" || countByWord.Item1 == "dog" || countByWord.Item1 == "lazy" ?
countByWord.Item2.Item2.IsDefined == true && countByWord.Item2.Item2.GetValue() == 23 :
(countByWord.Item1 == "brown" ? countByWord.Item2.Item2.IsDefined == false : countByWord.Item2.Item2.IsDefined == true && countByWord.Item2.Item2.GetValue() == 22));
}
});
}