in wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/mapping/PageRankMapping.java [83:252]
private Operator createPageRankSubplan(PageRankOperator pageRankOperator, int epoch) {
final String operatorBaseName = pageRankOperator.getName() == null ?
"PageRank" :
pageRankOperator.getName();
// TODO: We only need this MapOperator, because we cannot have a singl Subplan InputSlot that maps to two
// inner InputSlots.
MapOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> forward = new MapOperator<>(
t -> t, ReflectionUtils.specify(Tuple2.class), ReflectionUtils.specify(Tuple2.class)
);
forward.at(epoch);
forward.setName(String.format("%s (forward)", operatorBaseName));
// Find all vertices.
FlatMapOperator<Tuple2<Long, Long>, Long> vertexExtractor = new FlatMapOperator<>(
new FlatMapDescriptor<>(
(FunctionDescriptor.SerializableFunction<Tuple2<Long, Long>, Iterable<Long>>) edge -> {
final List<Long> out = new ArrayList<>(2);
out.add(edge.field0);
out.add(edge.field1);
return out;
},
ReflectionUtils.specify(Tuple2.class), Long.class,
ProbabilisticDoubleInterval.ofExactly(2)
)
);
vertexExtractor.at(epoch);
vertexExtractor.setName(String.format("%s (extract vertices)", operatorBaseName));
forward.connectTo(0, vertexExtractor, 0);
// Get the distinct vertices.
DistinctOperator<Long> vertexDistincter = new DistinctOperator<>(Long.class);
vertexDistincter.at(epoch);
vertexDistincter.setName(String.format("%s (distinct vertices)", operatorBaseName));
vertexDistincter.setCardinalityEstimator(0, new DefaultCardinalityEstimator(
0.5d, 1, false, longs -> Math.round(longs[0] * NUM_VERTICES_PER_EDGE / 2)
));
vertexExtractor.connectTo(0, vertexDistincter, 0);
// Count the vertices.
CountOperator<Long> vertexCounter = new CountOperator<>(Long.class);
vertexCounter.at(epoch);
vertexCounter.setName(String.format("%s (count vertices)", operatorBaseName));
vertexDistincter.connectTo(0, vertexCounter, 0);
// Create the adjancencies.
MapOperator<Tuple2<Long, Long>, Tuple2<Long, long[]>> adjacencyPreparator = new MapOperator<>(
t -> new Tuple2<>(t.field0, new long[]{t.field1}),
ReflectionUtils.specify(Tuple2.class),
ReflectionUtils.specify(Tuple2.class)
);
adjacencyPreparator.at(epoch);
adjacencyPreparator.setName(String.format("%s (prepare adjacencies)", operatorBaseName));
forward.connectTo(0, adjacencyPreparator, 0);
ReduceByOperator<Tuple2<Long, long[]>, Long> adjacencyCreator = new ReduceByOperator<>(
Tuple2::getField0,
(t1, t2) -> {
// NB: We don't care about duplicates because they should influence the PageRanks.
// That being said, in some cases there are more efficient implementations of bags.
long[] targetVertices = new long[t1.field1.length + t2.field1.length];
System.arraycopy(t1.field1, 0, targetVertices, 0, t1.field1.length);
System.arraycopy(t2.field1, 0, targetVertices, t1.field1.length, t2.field1.length);
return new Tuple2<>(t1.field0, targetVertices);
},
ReflectionUtils.specify(Long.class),
ReflectionUtils.specify(Tuple2.class)
);
adjacencyCreator.at(epoch);
adjacencyCreator.setName(String.format("%s (create adjacencies)", operatorBaseName));
adjacencyCreator.setCardinalityEstimator(0, new DefaultCardinalityEstimator(
0.5d, 1, false, longs -> Math.round(longs[0] * NUM_VERTICES_PER_EDGE)
));
adjacencyPreparator.connectTo(0, adjacencyCreator, 0);
// Create the initial page ranks.
MapOperator<Long, Tuple2<Long, Float>> initializeRanks = new MapOperator<>(
new RankInitializer(),
Long.class, ReflectionUtils.specify(Tuple2.class)
);
initializeRanks.at(epoch);
initializeRanks.setName(String.format("%s (initialize ranks)", operatorBaseName));
vertexDistincter.connectTo(0, initializeRanks, 0);
vertexCounter.broadcastTo(0, initializeRanks, "numVertices");
// Send the initial page ranks into the loop.
RepeatOperator<Tuple2<Long, long[]>> loopHead = new RepeatOperator<>(
pageRankOperator.getNumIterations(), ReflectionUtils.specify(Tuple2.class)
);
loopHead.at(epoch);
loopHead.setName(String.format("%s (loop head)", operatorBaseName));
loopHead.initialize(initializeRanks, 0);
// Join adjacencies and current ranks.
JoinOperator<Tuple2<Long, long[]>, Tuple2<Long, Float>, Long> rankJoin =
new JoinOperator<>(
Tuple2::getField0,
Tuple2::getField0,
ReflectionUtils.specify(Tuple2.class),
ReflectionUtils.specify(Tuple2.class),
Long.class
);
rankJoin.at(epoch);
rankJoin.setName(String.format("%s (join adjacencies and ranks)", operatorBaseName));
rankJoin.setCardinalityEstimator(0, new DefaultCardinalityEstimator(
.99d, 2, false, longs -> longs[0]
));
adjacencyCreator.connectTo(0, rankJoin, 0);
loopHead.connectTo(RepeatOperator.ITERATION_OUTPUT_INDEX, rankJoin, 1);
// Create the new partial ranks.
FlatMapOperator<Tuple2<Tuple2<Long, long[]>, Tuple2<Long, Float>>, Tuple2<Long, Float>> partialRankCreator =
new FlatMapOperator<>(new FlatMapDescriptor<>(
adjacencyAndRank -> {
Long sourceVertex = adjacencyAndRank.field0.field0;
final long[] targetVertices = adjacencyAndRank.field0.field1;
final float baseRank = adjacencyAndRank.field1.field1;
final Float partialRank = baseRank / targetVertices.length;
Collection<Tuple2<Long, Float>> partialRanks = new ArrayList<>(targetVertices.length + 1);
for (long targetVertex : targetVertices) {
partialRanks.add(new Tuple2<>(targetVertex, partialRank));
}
// Add a surrogate partial rank to avoid losing unreferenced vertices.
partialRanks.add(new Tuple2<>(sourceVertex, 0f));
return partialRanks;
},
ReflectionUtils.specify(Tuple2.class),
ReflectionUtils.specify(Tuple2.class),
ProbabilisticDoubleInterval.ofExactly(1d / NUM_VERTICES_PER_EDGE)
));
partialRankCreator.at(epoch);
partialRankCreator.setName(String.format("%s (create partial ranks)", operatorBaseName));
rankJoin.connectTo(0, partialRankCreator, 0);
// Sum the partial ranks.
ReduceByOperator<Tuple2<Long, Float>, Long> sumPartialRanks = new ReduceByOperator<>(
Tuple2::getField0,
(t1, t2) -> new Tuple2<>(t1.field0, t1.field1 + t2.field1),
Long.class,
ReflectionUtils.specify(Tuple2.class)
);
sumPartialRanks.at(epoch);
sumPartialRanks.setName(String.format("%s (sum partial ranks)", operatorBaseName));
sumPartialRanks.setCardinalityEstimator(0, new DefaultCardinalityEstimator(
0.5d, 1, false, longs -> Math.round(longs[0] * NUM_VERTICES_PER_EDGE)
));
partialRankCreator.connectTo(0, sumPartialRanks, 0);
// Apply the damping factor.
MapOperator<Tuple2<Long, Float>, Tuple2<Long, Float>> damping = new MapOperator<>(
new ApplyDamping(pageRankOperator.getDampingFactor()),
ReflectionUtils.specify(Tuple2.class),
ReflectionUtils.specify(Tuple2.class)
);
damping.at(epoch);
damping.setName(String.format("%s (damping)", operatorBaseName));
sumPartialRanks.connectTo(0, damping, 0);
vertexCounter.broadcastTo(0, damping, "numVertices");
loopHead.endIteration(damping, 0);
final LoopSubplan loopSubplan = LoopIsolator.isolate(loopHead);
loopSubplan.at(epoch);
return Subplan.wrap(
Collections.singletonList(forward.getInput()),
Collections.singletonList(loopSubplan.getOutput(0)),
null
).at(epoch);
}