private VertexState setupVertex()

in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java [2724:2820]


  private VertexState setupVertex() {

    this.initTimeRequested = clock.getTime();

    // VertexManager needs to be setup before attempting to Initialize any
    // Inputs - since events generated by them will be routed to the
    // VertexManager for handling.
    if (dagVertexGroups != null && !dagVertexGroups.isEmpty()) {
      List<GroupInputSpec> groupSpecList = Lists.newLinkedList();
      for (VertexGroupInfo groupInfo : dagVertexGroups.values()) {
        if (groupInfo.edgeMergedInputs.containsKey(getName())) {
          InputDescriptor mergedInput = groupInfo.edgeMergedInputs.get(getName());
          groupSpecList.add(new GroupInputSpec(groupInfo.groupName,
              Lists.newLinkedList(groupInfo.groupMembers), mergedInput));
        }
      }
      if (!groupSpecList.isEmpty()) {
        groupInputSpecList = groupSpecList;
      }
    }

    // Check if any inputs need initializers
    if (rootInputDescriptors != null) {
      LOG.info("Root Inputs exist for Vertex: " + getName() + " : "
          + rootInputDescriptors);
      for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input
          : rootInputDescriptors.values()) {
        if (input.getControllerDescriptor() != null &&
            input.getControllerDescriptor().getClassName() != null) {
          if (!hasInputInitializers()) {
            inputsWithInitializers = Sets.newHashSet();
          }
          inputsWithInitializers.add(input.getName());
          LOG.info("Starting root input initializer for input: "
              + input.getName() + ", with class: ["
              + input.getControllerDescriptor().getClassName() + "]");
        }
      }
    }

    boolean hasBipartite = false;
    if (sourceVertices != null) {
      for (Edge edge : sourceVertices.values()) {
        if (edge.getEdgeProperty().getDataMovementType() == DataMovementType.SCATTER_GATHER) {
          hasBipartite = true;
          break;
        }
      }
    }

    if (hasBipartite && hasInputInitializers()) {
      LOG.error("A vertex with an Initial Input and a Shuffle Input are not supported at the moment");
      return finished(VertexState.FAILED);
    }

    numTasks = getVertexPlan().getTaskConfig().getNumTasks();
    if (!(numTasks == -1 || numTasks >= 0)) {
      addDiagnostic("Invalid task count for vertex"
          + ", numTasks=" + numTasks);
      trySetTerminationCause(VertexTerminationCause.INVALID_NUM_OF_TASKS);
      return VertexState.FAILED;
    }

    checkTaskLimits();
    // set VertexManager as the last step. Because in recovery case, we may need to restore 
    // some info from last the AM attempt and skip the initialization step. Otherwise numTasks may be
    // reset to -1 after the restore.
    try {
      assignVertexManager();
    } catch (TezException e1) {
      String msg = "Fail to create VertexManager, " + ExceptionUtils.getStackTrace(e1);
      LOG.error(msg);
      return finished(VertexState.FAILED, VertexTerminationCause.INIT_FAILURE, msg);
    }

    try {
      vertexManager.initialize();
      vmIsInitialized.set(true);
      if (!pendingVmEvents.isEmpty()) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Processing: " + pendingVmEvents.size() + " pending VMEvents for Vertex: " +
              logIdentifier);
        }
        for (VertexManagerEvent vmEvent : pendingVmEvents) {
          vertexManager.onVertexManagerEventReceived(vmEvent);
        }
        pendingVmEvents.clear();
      }
    } catch (AMUserCodeException e) {
      String msg = "Exception in " + e.getSource()+ ", vertex:" + logIdentifier;
      LOG.error(msg, e);
      finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE,
          msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace(e.getCause()));
      return VertexState.FAILED;
    }
    return VertexState.INITED;
  }