in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java [1151:1227]
DAGState initializeDAG(DAGInitializedEvent event) {
if (event != null) {
initTime = event.getInitTime();
} else {
initTime = clock.getTime();
}
commitAllOutputsOnSuccess = conf.getBoolean(
TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS_DEFAULT);
// If we have no vertices, fail the dag
numVertices = getJobPlan().getVertexCount();
if (numVertices == 0) {
addDiagnostic("No vertices for dag");
trySetTerminationCause(DAGTerminationCause.ZERO_VERTICES);
if (event != null) {
return DAGState.FAILED;
}
return finished(DAGState.FAILED);
}
if (jobPlan.getVertexGroupsCount() > 0) {
for (PlanVertexGroupInfo groupInfo : jobPlan.getVertexGroupsList()) {
vertexGroups.put(groupInfo.getGroupName(), new VertexGroupInfo(groupInfo));
}
for (VertexGroupInfo groupInfo : vertexGroups.values()) {
for (String vertexName : groupInfo.groupMembers) {
List<VertexGroupInfo> groupList = vertexGroupInfo.get(vertexName);
if (groupList == null) {
groupList = Lists.newLinkedList();
vertexGroupInfo.put(vertexName, groupList);
}
groupList.add(groupInfo);
}
}
}
// create the vertices`
for (int i=0; i < numVertices; ++i) {
String vertexName = getJobPlan().getVertex(i).getName();
VertexImpl v = createVertex(this, vertexName, i);
addVertex(v);
}
createDAGEdges(this);
Map<String,EdgePlan> edgePlans = DagTypeConverters.createEdgePlanMapFromDAGPlan(getJobPlan().getEdgeList());
// setup the dag
for (Vertex v : vertices.values()) {
parseVertexEdges(this, edgePlans, v);
}
// Initialize the edges, now that the payload and vertices have been set.
for (Edge e : edges.values()) {
e.initialize();
}
assignDAGScheduler(this);
for (Map.Entry<String, VertexGroupInfo> entry : vertexGroups.entrySet()) {
String groupName = entry.getKey();
VertexGroupInfo groupInfo = entry.getValue();
if (!groupInfo.outputs.isEmpty()) {
// shared outputs
for (String vertexName : groupInfo.groupMembers) {
if (LOG.isDebugEnabled()) {
LOG.debug("Setting shared outputs for group: " + groupName +
" on vertex: " + vertexName);
}
Vertex v = getVertex(vertexName);
v.addSharedOutputs(groupInfo.outputs);
}
}
}
return DAGState.INITED;
}