private VertexState handleInitEvent()

in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java [3079:3179]


    private VertexState handleInitEvent(VertexImpl vertex) {
      VertexState state = vertex.setupVertex();
      if (state.equals(VertexState.FAILED)) {
        return state;
      }

      // TODO move before to handle NEW state
      if (vertex.targetVertices != null) {
        for (Edge e : vertex.targetVertices.values()) {
          if (e.getEdgeManager() == null) {
            Preconditions
                .checkState(
                    e.getEdgeProperty().getDataMovementType() == DataMovementType.CUSTOM,
                    "Null edge manager allowed only for custom edge. " + vertex.logIdentifier);
            vertex.uninitializedEdges.add(e);
          }
        }
      }
      if (vertex.sourceVertices != null) {
        for (Edge e : vertex.sourceVertices.values()) {
          if (e.getEdgeManager() == null) {
            Preconditions
                .checkState(
                    e.getEdgeProperty().getDataMovementType() == DataMovementType.CUSTOM,
                    "Null edge manager allowed only for custom edge. " + vertex.logIdentifier);
            vertex.uninitializedEdges.add(e);
          }
        }
      }

      // Create tasks based on initial configuration, but don't start them yet.
      if (vertex.numTasks == -1) {
        // this block must always return VertexState.INITIALIZING
        LOG.info("Num tasks is -1. Expecting VertexManager/InputInitializers/1-1 split"
            + " to set #tasks for the vertex " + vertex.getLogIdentifier());

        if (vertex.hasInputInitializers()) {
          if (vertex.recoveryData == null || !vertex.recoveryData.shouldSkipInit()) {
            LOG.info("Vertex will initialize from input initializer. " + vertex.logIdentifier);
            try {
              vertex.setupInputInitializerManager();
            } catch (TezException e) {
              String msg = "Fail to create InputInitializerManager, " + ExceptionUtils.getStackTrace(e);
              LOG.info(msg);
              return vertex.finished(VertexState.FAILED, VertexTerminationCause.INIT_FAILURE, msg);
            }
          }
          return VertexState.INITIALIZING;
        } else {
          boolean hasOneToOneUninitedSource = false;
          for (Map.Entry<Vertex, Edge> entry : vertex.sourceVertices.entrySet()) {
            if (entry.getValue().getEdgeProperty().getDataMovementType() ==
                DataMovementType.ONE_TO_ONE) {
              if (entry.getKey().getTotalTasks() == -1) {
                hasOneToOneUninitedSource = true;
                break;
              }
            }
          }
          if (hasOneToOneUninitedSource) {
            LOG.info("Vertex will initialize from 1-1 sources. " + vertex.logIdentifier);
            return VertexState.INITIALIZING;
          }
          if (vertex.vertexPlan.hasVertexManagerPlugin()) {
            LOG.info("Vertex will initialize via custom vertex manager. " + vertex.logIdentifier);
            return VertexState.INITIALIZING;
          }
          throw new TezUncheckedException(vertex.getLogIdentifier() +
          " has -1 tasks but does not have input initializers, " +
          "1-1 uninited sources or custom vertex manager to set it at runtime");
        }
      } else {
        LOG.info("Creating " + vertex.numTasks + " tasks for vertex: " + vertex.logIdentifier);
        vertex.createTasks();
        // this block may return VertexState.INITIALIZING
        if (vertex.hasInputInitializers() && (vertex.recoveryData == null || !vertex.recoveryData.shouldSkipInit())) {
          LOG.info("Vertex will initialize from input initializer. " + vertex.logIdentifier);
          try {
            vertex.setupInputInitializerManager();
          } catch (TezException e) {
            String msg = "Fail to create InputInitializerManager, " + ExceptionUtils.getStackTrace(e);
            LOG.error(msg);
            return vertex.finished(VertexState.FAILED, VertexTerminationCause.INIT_FAILURE, msg);
          }
          return VertexState.INITIALIZING;
        }
        if (!vertex.uninitializedEdges.isEmpty()) {
          LOG.info("Vertex has uninitialized edges. " + vertex.logIdentifier);
          return VertexState.INITIALIZING;
        }
        LOG.info("Directly initializing vertex: " + vertex.logIdentifier);
        // vertex is completely configured. Send out notification now.
        vertex.maybeSendConfiguredEvent();
        boolean isInitialized = vertex.initializeVertex();
        if (isInitialized) {
          return VertexState.INITED;
        } else {
          return VertexState.FAILED;
        }
      }
    }