private VertexState handleInitEvent()

in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java [2558:2668]


    private VertexState handleInitEvent(VertexImpl vertex, VertexEvent event) {
      VertexState state = vertex.setupVertex();
      if (state.equals(VertexState.FAILED)) {
        return state;
      }
      // TODO move before to handle NEW state 
      if (vertex.targetVertices != null) {
        for (Edge e : vertex.targetVertices.values()) {
          if (e.getEdgeManager() == null) {
            Preconditions
                .checkState(
                    e.getEdgeProperty().getDataMovementType() == DataMovementType.CUSTOM,
                    "Null edge manager allowed only for custom edge. " + vertex.logIdentifier);
            vertex.uninitializedEdges.add(e);
          }
        }
      }
      if (vertex.sourceVertices != null) {
        for (Edge e : vertex.sourceVertices.values()) {
          if (e.getEdgeManager() == null) {
            Preconditions
                .checkState(
                    e.getEdgeProperty().getDataMovementType() == DataMovementType.CUSTOM,
                    "Null edge manager allowed only for custom edge. " + vertex.logIdentifier);
            vertex.uninitializedEdges.add(e);
          }
        }
      }
      
      // Create tasks based on initial configuration, but don't start them yet.
      if (vertex.numTasks == -1) {
        LOG.info("Num tasks is -1. Expecting VertexManager/InputInitializers/1-1 split"
            + " to set #tasks for the vertex " + vertex.getVertexId());

        if (vertex.inputsWithInitializers != null) {
          // Use DAGScheduler to arbitrate resources among vertices later
          vertex.rootInputInitializerManager = vertex.createRootInputInitializerManager(
              vertex.getDAG().getName(), vertex.getName(), vertex.getVertexId(),
              vertex.eventHandler, -1,
              vertex.appContext.getTaskScheduler().getNumClusterNodes(),
              vertex.getTaskResource(),
              vertex.appContext.getTaskScheduler().getTotalResources());
          List<RootInputLeafOutputDescriptor<InputDescriptor>> inputList = Lists
              .newArrayListWithCapacity(vertex.inputsWithInitializers.size());
          for (String inputName : vertex.inputsWithInitializers) {
            inputList.add(vertex.rootInputDescriptors.get(inputName));
          }
          LOG.info("Vertex will initialize via inputInitializers "
              + vertex.logIdentifier + ". Starting root input initializers: "
              + vertex.inputsWithInitializers.size());
          vertex.rootInputInitializerManager.runInputInitializers(inputList);
          return VertexState.INITIALIZING;
        } else {
          boolean hasOneToOneUninitedSource = false;
          for (Map.Entry<Vertex, Edge> entry : vertex.sourceVertices.entrySet()) {
            if (entry.getValue().getEdgeProperty().getDataMovementType() == 
                DataMovementType.ONE_TO_ONE) {
              if (entry.getKey().getTotalTasks() == -1) {
                hasOneToOneUninitedSource = true;
                break;
              }
            }
          }
          if (hasOneToOneUninitedSource) {
            LOG.info("Vertex will initialize from 1-1 sources. " + vertex.logIdentifier);
            return VertexState.INITIALIZING;
          }
          if (vertex.vertexPlan.hasVertexManagerPlugin()) {
            LOG.info("Vertex will initialize via custom vertex manager. " + vertex.logIdentifier);
            return VertexState.INITIALIZING;
          }
          throw new TezUncheckedException(vertex.getVertexId() + 
          " has -1 tasks but does not have input initializers, " +
          "1-1 uninited sources or custom vertex manager to set it at runtime");
        }
      } else {
        LOG.info("Creating " + vertex.numTasks + " for vertex: " + vertex.logIdentifier);
        vertex.createTasks();
        if (vertex.inputsWithInitializers != null) {
          vertex.rootInputInitializerManager = vertex.createRootInputInitializerManager(
              vertex.getDAG().getName(), vertex.getName(), vertex.getVertexId(),
              vertex.eventHandler, vertex.getTotalTasks(),
              vertex.appContext.getTaskScheduler().getNumClusterNodes(),
              vertex.getTaskResource(),
              vertex.appContext.getTaskScheduler().getTotalResources());
          List<RootInputLeafOutputDescriptor<InputDescriptor>> inputList = Lists
              .newArrayListWithCapacity(vertex.inputsWithInitializers.size());
          for (String inputName : vertex.inputsWithInitializers) {
            inputList.add(vertex.rootInputDescriptors.get(inputName));
          }
          LOG.info("Starting root input initializers: "
              + vertex.inputsWithInitializers.size());
          // special case when numTasks>0 and still we want to stay in initializing 
          // state. This is handled in RootInputInitializedTransition specially.
          vertex.initWaitsForRootInitializers = true;
          vertex.rootInputInitializerManager.runInputInitializers(inputList);
          return VertexState.INITIALIZING;
        }
        if (!vertex.uninitializedEdges.isEmpty()) {
          LOG.info("Vertex has uninitialized edges. " + vertex.logIdentifier);
          return VertexState.INITIALIZING;
        }
        LOG.info("Directly initializing vertex: " + vertex.logIdentifier);
        boolean isInitialized = vertex.initializeVertex();
        if (isInitialized) {
          return VertexState.INITED;
        } else {
          return VertexState.FAILED;
        }
      }
    }