in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java [1578:1695]
DAGState initializeDAG() {
commitAllOutputsOnSuccess = dagConf.getBoolean(
TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS_DEFAULT);
// If we have no vertices, fail the dag
numVertices = getJobPlan().getVertexCount();
if (numVertices == 0) {
addDiagnostic("No vertices for dag");
trySetTerminationCause(DAGTerminationCause.ZERO_VERTICES);
return finished(DAGState.FAILED);
}
if (jobPlan.getVertexGroupsCount() > 0) {
for (PlanVertexGroupInfo groupInfo : jobPlan.getVertexGroupsList()) {
vertexGroups.put(groupInfo.getGroupName(), new VertexGroupInfo(groupInfo));
}
for (VertexGroupInfo groupInfo : vertexGroups.values()) {
for (String vertexName : groupInfo.groupMembers) {
List<VertexGroupInfo> groupList = vertexGroupInfo.get(vertexName);
if (groupList == null) {
groupList = Lists.newLinkedList();
vertexGroupInfo.put(vertexName, groupList);
}
groupList.add(groupInfo);
}
}
}
// create the vertices`
for (int i=0; i < numVertices; ++i) {
String vertexName = getJobPlan().getVertex(i).getName();
VertexImpl v = createVertex(this, vertexName, i);
addVertex(v);
}
// check task resources, only check it in non-local mode
if (!appContext.isLocal()) {
for (Vertex v : vertexMap.values()) {
// TODO TEZ-2003 (post) TEZ-2624 Ideally, this should be per source.
if (v.getTaskResource().compareTo(appContext.getClusterInfo().getMaxContainerCapability()) > 0) {
String msg = "Vertex's TaskResource is beyond the cluster container capability," +
"Vertex=" + v.getLogIdentifier() +", Requested TaskResource=" + v.getTaskResource()
+ ", Cluster MaxContainerCapability=" + appContext.getClusterInfo().getMaxContainerCapability();
LOG.error(msg);
addDiagnostic(msg);
finished(DAGState.FAILED);
return DAGState.FAILED;
}
}
}
try {
createDAGEdges(this);
} catch (TezException e2) {
String msg = "Fail to create edges, " + ExceptionUtils.getStackTrace(e2);
addDiagnostic(msg);
LOG.error(msg);
trySetTerminationCause(DAGTerminationCause.INIT_FAILURE);
finished(DAGState.FAILED);
return DAGState.FAILED;
}
Map<String,EdgePlan> edgePlans = DagTypeConverters.createEdgePlanMapFromDAGPlan(getJobPlan().getEdgeList());
// setup the dag
for (Vertex v : vertices.values()) {
parseVertexEdges(this, edgePlans, v);
}
computeVertexDescendants();
// Initialize the edges, now that the payload and vertices have been set.
for (Edge e : edges.values()) {
try {
e.initialize();
} catch (AMUserCodeException ex) {
String msg = "Exception in " + ex.getSource();
LOG.error(msg, ex);
addDiagnostic(msg + ", " + ex.getMessage() + ", "
+ ExceptionUtils.getStackTrace(ex.getCause()));
finished(DAGState.FAILED);
return DAGState.FAILED;
}
}
try {
assignDAGScheduler(this);
} catch (TezException e1) {
String msg = "Fail to assign DAGScheduler for dag:" + dagName + " due to "
+ ExceptionUtils.getStackTrace(e1);
LOG.error(msg);
addDiagnostic(msg);
trySetTerminationCause(DAGTerminationCause.INIT_FAILURE);
finished(DAGState.FAILED);
return DAGState.FAILED;
}
for (Map.Entry<String, VertexGroupInfo> entry : vertexGroups.entrySet()) {
String groupName = entry.getKey();
VertexGroupInfo groupInfo = entry.getValue();
if (!groupInfo.outputs.isEmpty()) {
// shared outputs
for (String vertexName : groupInfo.groupMembers) {
LOG.debug("Setting shared outputs for group: {} on vertex: {}", groupName, vertexName);
Vertex v = getVertex(vertexName);
v.addSharedOutputs(groupInfo.outputs);
}
}
}
// This is going to override the previously generated file
// which didn't have the priorities
if (getConf().getBoolean(TezConfiguration.TEZ_GENERATE_DEBUG_ARTIFACTS,
TezConfiguration.TEZ_GENERATE_DEBUG_ARTIFACTS_DEFAULT)) {
Utils.generateDAGVizFile(this, jobPlan, logDirs, dagScheduler);
}
return DAGState.INITED;
}