in wayang-benchmark/src/main/java/org/apache/wayang/apps/wordcount/Main.java [42:105]
public static void main(String[] args) throws IOException, URISyntaxException {
try {
if (args.length == 0) {
System.err.print("Usage: <platform1>[,<platform2>]* <input file URL>");
System.exit(1);
}
WayangContext wayangContext = new WayangContext();
for (String platform : args[0].split(",")) {
switch (platform) {
case "java":
wayangContext.register(Java.basicPlugin());
break;
case "spark":
wayangContext.register(Spark.basicPlugin());
break;
default:
System.err.format("Unknown platform: \"%s\"\n", platform);
System.exit(3);
return;
}
}
/* Get a plan builder */
JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
.withJobName("WordCount")
.withUdfJarOf(Main.class);
/* Start building the Apache WayangPlan */
Collection<Tuple2<String, Integer>> wordcounts = planBuilder
/* Read the text file */
.readTextFile(args[1]).withName("Load file")
/* Split each line by non-word characters */
.flatMap(line -> Arrays.asList(line.split("\\W+")))
.withSelectivity(1, 100, 0.9)
.withName("Split words")
/* Filter empty tokens */
.filter(token -> !token.isEmpty())
.withName("Filter empty words")
/* Attach counter to each word */
.map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To lower case, add counter")
// Sum up counters for every word.
.reduceByKey(
Tuple2::getField0,
(t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1())
)
.withName("Add counters")
/* Execute the plan and collect the results */
.collect();
System.out.printf("Found %d words:\n", wordcounts.size());
wordcounts.forEach(wc -> System.out.printf("%dx %s\n", wc.field1, wc.field0));
} catch (Exception e) {
System.err.println("App failed.");
e.printStackTrace();
System.exit(4);
}
}