in common/src/main/java/org/apache/nemo/common/ir/IRDAGChecker.java [386:434]
void addScheduleGroupCheckers() {
final GlobalDAGChecker scheduleGroupTopoOrdering = (irdag -> {
int lastSeenScheduleGroup = Integer.MIN_VALUE;
for (final IRVertex v : irdag.getVertices()) {
final MutableObject violatingReachableVertex = new MutableObject();
v.getPropertyValue(ScheduleGroupProperty.class).ifPresent(startingScheduleGroup ->
irdag.dfsDo(
v,
visited -> {
if (visited.getPropertyValue(ScheduleGroupProperty.class).isPresent()
&& visited.getPropertyValue(ScheduleGroupProperty.class).get() < startingScheduleGroup) {
violatingReachableVertex.setValue(visited);
}
},
DAGInterface.TraversalOrder.PreOrder,
new HashSet<>()));
if (violatingReachableVertex.getValue() != null) {
return failure(
"A reachable vertex with a smaller schedule group ",
v,
ScheduleGroupProperty.class,
violatingReachableVertex.getValue(),
ScheduleGroupProperty.class);
}
}
return success();
});
globalDAGCheckerList.add(scheduleGroupTopoOrdering);
final SingleEdgeChecker splitByPull = (edge -> {
if (Util.isControlEdge(edge)) {
return success();
}
if (Optional.of(DataFlowProperty.Value.PULL).equals(edge.getPropertyValue(DataFlowProperty.class))) {
final Optional<Integer> srcSG = edge.getSrc().getPropertyValue(ScheduleGroupProperty.class);
final Optional<Integer> dstSG = edge.getDst().getPropertyValue(ScheduleGroupProperty.class);
if (srcSG.isPresent() && dstSG.isPresent()) {
if (srcSG.get().equals(dstSG.get())) {
return failure("Schedule group must split by Pull",
edge.getSrc(), ScheduleGroupProperty.class, edge.getDst(), ScheduleGroupProperty.class);
}
}
}
return success();
});
singleEdgeCheckerList.add(splitByPull);
}