DAGState initializeDAG()

in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java [1578:1695]


  DAGState initializeDAG() {
    commitAllOutputsOnSuccess = dagConf.getBoolean(
        TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
        TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS_DEFAULT);

    // If we have no vertices, fail the dag
    numVertices = getJobPlan().getVertexCount();
    if (numVertices == 0) {
      addDiagnostic("No vertices for dag");
      trySetTerminationCause(DAGTerminationCause.ZERO_VERTICES);
      return finished(DAGState.FAILED);
    }

    if (jobPlan.getVertexGroupsCount() > 0) {
      for (PlanVertexGroupInfo groupInfo : jobPlan.getVertexGroupsList()) {
        vertexGroups.put(groupInfo.getGroupName(), new VertexGroupInfo(groupInfo));
      }
      for (VertexGroupInfo groupInfo : vertexGroups.values()) {
        for (String vertexName : groupInfo.groupMembers) {
          List<VertexGroupInfo> groupList = vertexGroupInfo.get(vertexName);
          if (groupList == null) {
            groupList = Lists.newLinkedList();
            vertexGroupInfo.put(vertexName, groupList);
          }
          groupList.add(groupInfo);
        }
      }
    }

    // create the vertices`
    for (int i=0; i < numVertices; ++i) {
      String vertexName = getJobPlan().getVertex(i).getName();
      VertexImpl v = createVertex(this, vertexName, i);
      addVertex(v);
    }

    // check task resources, only check it in non-local mode
    if (!appContext.isLocal()) {
      for (Vertex v : vertexMap.values()) {
        // TODO TEZ-2003 (post) TEZ-2624 Ideally, this should be per source.
        if (v.getTaskResource().compareTo(appContext.getClusterInfo().getMaxContainerCapability()) > 0) {
          String msg = "Vertex's TaskResource is beyond the cluster container capability," +
              "Vertex=" + v.getLogIdentifier() +", Requested TaskResource=" + v.getTaskResource()
              + ", Cluster MaxContainerCapability=" + appContext.getClusterInfo().getMaxContainerCapability();
          LOG.error(msg);
          addDiagnostic(msg);
          finished(DAGState.FAILED);
          return DAGState.FAILED;
        }
      }
    }

    try {
      createDAGEdges(this);
    } catch (TezException e2) {
      String msg = "Fail to create edges, " + ExceptionUtils.getStackTrace(e2);
      addDiagnostic(msg);
      LOG.error(msg);
      trySetTerminationCause(DAGTerminationCause.INIT_FAILURE);
      finished(DAGState.FAILED);
      return DAGState.FAILED;
    }
    Map<String,EdgePlan> edgePlans = DagTypeConverters.createEdgePlanMapFromDAGPlan(getJobPlan().getEdgeList());

    // setup the dag
    for (Vertex v : vertices.values()) {
      parseVertexEdges(this, edgePlans, v);
    }

    computeVertexDescendants();

    // Initialize the edges, now that the payload and vertices have been set.
    for (Edge e : edges.values()) {
      try {
        e.initialize();
      } catch (AMUserCodeException ex) {
        String msg = "Exception in " + ex.getSource();
        LOG.error(msg, ex);
        addDiagnostic(msg + ", " + ex.getMessage() + ", "
            + ExceptionUtils.getStackTrace(ex.getCause()));
        finished(DAGState.FAILED);
        return DAGState.FAILED;
      }
    }

    try {
      assignDAGScheduler(this);
    } catch (TezException e1) {
      String msg = "Fail to assign DAGScheduler for dag:" + dagName + " due to "
          + ExceptionUtils.getStackTrace(e1);
      LOG.error(msg);
      addDiagnostic(msg);
      trySetTerminationCause(DAGTerminationCause.INIT_FAILURE);
      finished(DAGState.FAILED);
      return DAGState.FAILED;
    }

    for (Map.Entry<String, VertexGroupInfo> entry : vertexGroups.entrySet()) {
      String groupName = entry.getKey();
      VertexGroupInfo groupInfo = entry.getValue();
      if (!groupInfo.outputs.isEmpty()) {
        // shared outputs
        for (String vertexName : groupInfo.groupMembers) {
          LOG.debug("Setting shared outputs for group: {} on vertex: {}", groupName, vertexName);
          Vertex v = getVertex(vertexName);
          v.addSharedOutputs(groupInfo.outputs);
        }
      }
    }

    // This is going to override the previously generated file
    // which didn't have the priorities
    if (getConf().getBoolean(TezConfiguration.TEZ_GENERATE_DEBUG_ARTIFACTS,
        TezConfiguration.TEZ_GENERATE_DEBUG_ARTIFACTS_DEFAULT)) {
      Utils.generateDAGVizFile(this, jobPlan, logDirs, dagScheduler);
    }
    return DAGState.INITED;
  }