in common/src/main/java/org/apache/nemo/common/ir/vertex/LoopVertex.java [314:365]
public LoopVertex unRollIteration(final DAGBuilder<IRVertex, IREdge> dagBuilder) {
final HashMap<IRVertex, IRVertex> originalToNewIRVertex = new HashMap<>();
final DAG<IRVertex, IREdge> dagToAdd = getDAG();
decreaseMaxNumberOfIterations();
// add the DAG and internal edges to the dagBuilder.
dagToAdd.topologicalDo(irVertex -> {
final IRVertex newIrVertex = irVertex.getClone();
originalToNewIRVertex.putIfAbsent(irVertex, newIrVertex);
dagBuilder.addVertex(newIrVertex, dagToAdd);
dagToAdd.getIncomingEdgesOf(irVertex).forEach(edge -> {
final IRVertex newSrc = originalToNewIRVertex.get(edge.getSrc());
final IREdge newIrEdge =
new IREdge(edge.getPropertyValue(CommunicationPatternProperty.class).get(), newSrc, newIrVertex);
edge.copyExecutionPropertiesTo(newIrEdge);
dagBuilder.connectVertices(newIrEdge);
});
});
// process the initial DAG incoming edges for the first loop.
getDagIncomingEdges().forEach((dstVertex, irEdges) -> irEdges.forEach(edge -> {
final IREdge newIrEdge = new IREdge(edge.getPropertyValue(CommunicationPatternProperty.class).get(),
edge.getSrc(), originalToNewIRVertex.get(dstVertex));
edge.copyExecutionPropertiesTo(newIrEdge);
dagBuilder.connectVertices(newIrEdge);
}));
if (loopTerminationConditionMet()) {
// if termination condition met, we process the last DAG outgoing edges for the final loop. Otherwise, we leave it
getDagOutgoingEdges().forEach((srcVertex, irEdges) -> irEdges.forEach(edge -> {
final IREdge newIrEdge = new IREdge(edge.getPropertyValue(CommunicationPatternProperty.class).get(),
originalToNewIRVertex.get(srcVertex), edge.getDst());
edge.copyExecutionPropertiesTo(newIrEdge);
dagBuilder.addVertex(edge.getDst()).connectVertices(newIrEdge);
}));
}
// process next iteration's DAG incoming edges, and add them as the next loop's incoming edges:
// clear, as we're done with the current loop and need to prepare it for the next one.
this.getDagIncomingEdges().clear();
this.nonIterativeIncomingEdges.forEach((dstVertex, irEdges) -> irEdges.forEach(this::addDagIncomingEdge));
this.iterativeIncomingEdges.forEach((dstVertex, irEdges) -> irEdges.forEach(edge -> {
final IREdge newIrEdge = new IREdge(edge.getPropertyValue(CommunicationPatternProperty.class).get(),
originalToNewIRVertex.get(edge.getSrc()), dstVertex);
edge.copyExecutionPropertiesTo(newIrEdge);
this.addDagIncomingEdge(newIrEdge);
}));
return this;
}