in common/src/main/java/org/apache/nemo/common/ir/vertex/utility/TaskSizeSplitterVertex.java [160:289]
public TaskSizeSplitterVertex unRollIteration(final DAGBuilder<IRVertex, IREdge> dagBuilder) {
final HashMap<IRVertex, IRVertex> originalToNewIRVertex = new HashMap<>();
final HashSet<IRVertex> originalUtilityVertices = new HashSet<>();
final HashSet<IREdge> edgesToOptimize = new HashSet<>();
if (testingTrial.intValue() == 0) {
insertSignalVertex(new SignalVertex());
}
final List<OperatorVertex> previousSignalVertex = new ArrayList<>(1);
final DAG<IRVertex, IREdge> dagToAdd = getDAG();
decreaseMaxNumberOfIterations();
// add the working vertex and its incoming edges to the dagBuilder.
dagToAdd.topologicalDo(irVertex -> {
if (!(irVertex instanceof SignalVertex)) {
final IRVertex newIrVertex = irVertex.getClone();
setParallelismPropertyByTestingTrial(newIrVertex);
originalToNewIRVertex.putIfAbsent(irVertex, newIrVertex);
dagBuilder.addVertex(newIrVertex, dagToAdd);
dagToAdd.getIncomingEdgesOf(irVertex).forEach(edge -> {
final IRVertex newSrc = originalToNewIRVertex.get(edge.getSrc());
final IREdge newIrEdge =
new IREdge(edge.getPropertyValue(CommunicationPatternProperty.class).get(), newSrc, newIrVertex);
edge.copyExecutionPropertiesTo(newIrEdge);
setSubPartitionSetPropertyByTestingTrial(newIrEdge);
edgesToOptimize.add(newIrEdge);
dagBuilder.connectVertices(newIrEdge);
});
} else {
originalUtilityVertices.add(irVertex);
}
});
// process the initial DAG incoming edges for the first loop.
getDagIncomingEdges().forEach((dstVertex, irEdges) -> irEdges.forEach(edge -> {
final IREdge newIrEdge = new IREdge(edge.getPropertyValue(CommunicationPatternProperty.class).get(),
edge.getSrc(), originalToNewIRVertex.get(dstVertex));
edge.copyExecutionPropertiesTo(newIrEdge);
setSubPartitionSetPropertyByTestingTrial(newIrEdge);
if (edge.getSrc() instanceof OperatorVertex
&& ((OperatorVertex) edge.getSrc()).getTransform() instanceof SignalTransform) {
previousSignalVertex.add((OperatorVertex) edge.getSrc());
} else {
edgesToOptimize.add(newIrEdge);
}
dagBuilder.connectVertices(newIrEdge);
}));
getDagOutgoingEdges().forEach((srcVertex, irEdges) -> irEdges.forEach(edgeFromOriginal -> {
for (Map.Entry<IREdge, IREdge> entry : this.getEdgeWithInternalVertexToEdgeWithLoop().entrySet()) {
if (entry.getKey().getId().equals(edgeFromOriginal.getId())) {
final IREdge correspondingEdge = entry.getValue(); // edge to next splitter vertex
if (correspondingEdge.getDst() instanceof TaskSizeSplitterVertex) {
TaskSizeSplitterVertex nextSplitter = (TaskSizeSplitterVertex) correspondingEdge.getDst();
IRVertex dstVertex = edgeFromOriginal.getDst(); // vertex inside of next splitter vertex
List<IREdge> edgesToDelete = new ArrayList<>();
List<IREdge> edgesToAdd = new ArrayList<>();
for (IREdge edgeToDst : nextSplitter.getDagIncomingEdges().get(dstVertex)) {
if (edgeToDst.getSrc().getId().equals(srcVertex.getId())) {
final IREdge newIrEdge = new IREdge(
edgeFromOriginal.getPropertyValue(CommunicationPatternProperty.class).get(),
originalToNewIRVertex.get(srcVertex),
edgeFromOriginal.getDst());
edgeToDst.copyExecutionPropertiesTo(newIrEdge);
edgesToDelete.add(edgeToDst);
edgesToAdd.add(newIrEdge);
final IREdge newLoopEdge = Util.cloneEdge(
correspondingEdge, newIrEdge.getSrc(), correspondingEdge.getDst());
nextSplitter.mapEdgeWithLoop(newLoopEdge, newIrEdge);
}
}
if (loopTerminationConditionMet()) {
for (IREdge edgeToDelete : edgesToDelete) {
nextSplitter.removeDagIncomingEdge(edgeToDelete);
nextSplitter.removeNonIterativeIncomingEdge(edgeToDelete);
}
}
for (IREdge edgeToAdd : edgesToAdd) {
nextSplitter.addDagIncomingEdge(edgeToAdd);
nextSplitter.addNonIterativeIncomingEdge(edgeToAdd);
}
} else {
final IREdge newIrEdge = new IREdge(
edgeFromOriginal.getPropertyValue(CommunicationPatternProperty.class).get(),
originalToNewIRVertex.get(srcVertex), edgeFromOriginal.getDst());
edgeFromOriginal.copyExecutionPropertiesTo(newIrEdge);
dagBuilder.addVertex(edgeFromOriginal.getDst()).connectVertices(newIrEdge);
}
}
}
}));
// if loop termination condition is false, add signal vertex
if (!loopTerminationConditionMet()) {
for (IRVertex helper : originalUtilityVertices) {
final IRVertex newHelper = helper.getClone();
originalToNewIRVertex.putIfAbsent(helper, newHelper);
setParallelismPropertyByTestingTrial(newHelper);
dagBuilder.addVertex(newHelper, dagToAdd);
dagToAdd.getIncomingEdgesOf(helper).forEach(edge -> {
final IRVertex newSrc = originalToNewIRVertex.get(edge.getSrc());
final IREdge newIrEdge =
new IREdge(edge.getPropertyValue(CommunicationPatternProperty.class).get(), newSrc, newHelper);
edge.copyExecutionPropertiesTo(newIrEdge);
dagBuilder.connectVertices(newIrEdge);
});
}
}
// assign signal vertex of n-th iteration with nonIterativeIncomingEdges of (n+1)th iteration
markEdgesToOptimize(previousSignalVertex, edgesToOptimize);
// process next iteration's DAG incoming edges, and add them as the next loop's incoming edges:
// clear, as we're done with the current loop and need to prepare it for the next one.
this.getDagIncomingEdges().clear();
this.getNonIterativeIncomingEdges().forEach((dstVertex, irEdges) -> irEdges.forEach(this::addDagIncomingEdge));
if (!loopTerminationConditionMet()) {
this.getIterativeIncomingEdges().forEach((dstVertex, irEdges) -> irEdges.forEach(edge -> {
final IREdge newIrEdge = new IREdge(edge.getPropertyValue(CommunicationPatternProperty.class).get(),
originalToNewIRVertex.get(edge.getSrc()), dstVertex);
edge.copyExecutionPropertiesTo(newIrEdge);
this.addDagIncomingEdge(newIrEdge);
}));
}
increaseTestingTrial();
return this;
}