in wayang-benchmark/code/main/java/org/apache/wayang/apps/wordcount/Main.java [54:115]
public static WayangPlan createWayangPlan(String inputFileUrl, Collection<Tuple2<String, Integer>> collector) throws URISyntaxException, IOException {
// Assignment mode: none.
TextFileSource textFileSource = new TextFileSource(inputFileUrl);
textFileSource.setName("Load file");
// for each line (input) output an iterator of the words
FlatMapOperator<String, String> flatMapOperator = new FlatMapOperator<>(
new FlatMapDescriptor<>(line -> Arrays.asList(line.split("\\W+")),
String.class,
String.class,
new ProbabilisticDoubleInterval(100, 10000, 0.8)
)
);
flatMapOperator.setName("Split words");
FilterOperator<String> filterOperator = new FilterOperator<>(str -> !str.isEmpty(), String.class);
filterOperator.setName("Filter empty words");
// for each word transform it to lowercase and output a key-value pair (word, 1)
MapOperator<String, Tuple2<String, Integer>> mapOperator = new MapOperator<>(
new TransformationDescriptor<>(word -> new Tuple2<>(word.toLowerCase(), 1),
DataUnitType.createBasic(String.class),
DataUnitType.createBasicUnchecked(Tuple2.class)
), DataSetType.createDefault(String.class),
DataSetType.createDefaultUnchecked(Tuple2.class)
);
mapOperator.setName("To lower case, add counter");
// groupby the key (word) and add up the values (frequency)
ReduceByOperator<Tuple2<String, Integer>, String> reduceByOperator = new ReduceByOperator<>(
new TransformationDescriptor<>(pair -> pair.field0,
DataUnitType.createBasicUnchecked(Tuple2.class),
DataUnitType.createBasic(String.class)), new ReduceDescriptor<>(
((a, b) -> {
a.field1 += b.field1;
return a;
}), DataUnitType.createGroupedUnchecked(Tuple2.class),
DataUnitType.createBasicUnchecked(Tuple2.class)
), DataSetType.createDefaultUnchecked(Tuple2.class)
);
reduceByOperator.setName("Add counters");
// write results to a sink
LocalCallbackSink<Tuple2<String, Integer>> sink = LocalCallbackSink.createCollectingSink(
collector,
DataSetType.createDefaultUnchecked(Tuple2.class)
);
sink.setName("Collect result");
// Build Rheem plan by connecting operators
textFileSource.connectTo(0, flatMapOperator, 0);
flatMapOperator.connectTo(0, filterOperator, 0);
filterOperator.connectTo(0, mapOperator, 0);
mapOperator.connectTo(0, reduceByOperator, 0);
reduceByOperator.connectTo(0, sink, 0);
return new WayangPlan(sink);
}