DAGState initializeDAG()

in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java [1151:1227]


  DAGState initializeDAG(DAGInitializedEvent event) {
    if (event != null) {
      initTime = event.getInitTime();
    } else {
      initTime = clock.getTime();
    }

    commitAllOutputsOnSuccess = conf.getBoolean(
        TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
        TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS_DEFAULT);

    // If we have no vertices, fail the dag
    numVertices = getJobPlan().getVertexCount();
    if (numVertices == 0) {
      addDiagnostic("No vertices for dag");
      trySetTerminationCause(DAGTerminationCause.ZERO_VERTICES);
      if (event != null) {
        return DAGState.FAILED;
      }
      return finished(DAGState.FAILED);
    }

    if (jobPlan.getVertexGroupsCount() > 0) {
      for (PlanVertexGroupInfo groupInfo : jobPlan.getVertexGroupsList()) {
        vertexGroups.put(groupInfo.getGroupName(), new VertexGroupInfo(groupInfo));
      }
      for (VertexGroupInfo groupInfo : vertexGroups.values()) {
        for (String vertexName : groupInfo.groupMembers) {
          List<VertexGroupInfo> groupList = vertexGroupInfo.get(vertexName);
          if (groupList == null) {
            groupList = Lists.newLinkedList();
            vertexGroupInfo.put(vertexName, groupList);
          }
          groupList.add(groupInfo);
        }
      }
    }

    // create the vertices`
    for (int i=0; i < numVertices; ++i) {
      String vertexName = getJobPlan().getVertex(i).getName();
      VertexImpl v = createVertex(this, vertexName, i);
      addVertex(v);
    }

    createDAGEdges(this);
    Map<String,EdgePlan> edgePlans = DagTypeConverters.createEdgePlanMapFromDAGPlan(getJobPlan().getEdgeList());

    // setup the dag
    for (Vertex v : vertices.values()) {
      parseVertexEdges(this, edgePlans, v);
    }

    // Initialize the edges, now that the payload and vertices have been set.
    for (Edge e : edges.values()) {
      e.initialize();
    }

    assignDAGScheduler(this);

    for (Map.Entry<String, VertexGroupInfo> entry : vertexGroups.entrySet()) {
      String groupName = entry.getKey();
      VertexGroupInfo groupInfo = entry.getValue();
      if (!groupInfo.outputs.isEmpty()) {
        // shared outputs
        for (String vertexName : groupInfo.groupMembers) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Setting shared outputs for group: " + groupName +
                " on vertex: " + vertexName);
          }
          Vertex v = getVertex(vertexName);
          v.addSharedOutputs(groupInfo.outputs);
        }
      }
    }
    return DAGState.INITED;
  }