private VertexState setupVertex()

in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java [1752:1848]


  private VertexState setupVertex(VertexInitializedEvent event) {

    if (event == null) {
      initTimeRequested = clock.getTime();
    } else {
      initTimeRequested = event.getInitRequestedTime();
      initedTime = event.getInitedTime();
    }

    // VertexManager needs to be setup before attempting to Initialize any
    // Inputs - since events generated by them will be routed to the
    // VertexManager for handling.

    if (dagVertexGroups != null && !dagVertexGroups.isEmpty()) {
      List<GroupInputSpec> groupSpecList = Lists.newLinkedList();
      for (VertexGroupInfo groupInfo : dagVertexGroups.values()) {
        if (groupInfo.edgeMergedInputs.containsKey(getName())) {
          InputDescriptor mergedInput = groupInfo.edgeMergedInputs.get(getName());
          groupSpecList.add(new GroupInputSpec(groupInfo.groupName,
              Lists.newLinkedList(groupInfo.groupMembers), mergedInput));
        }
      }
      if (!groupSpecList.isEmpty()) {
        groupInputSpecList = groupSpecList;
      }
    }

    // Check if any inputs need initializers
    if (event != null) {
      this.rootInputDescriptors = event.getAdditionalInputs();
    } else {
      if (rootInputDescriptors != null) {
        LOG.info("Root Inputs exist for Vertex: " + getName() + " : "
            + rootInputDescriptors);
        for (RootInputLeafOutputDescriptor<InputDescriptor> input : rootInputDescriptors.values()) {
          if (input.getInitializerClassName() != null) {
            if (inputsWithInitializers == null) {
              inputsWithInitializers = Sets.newHashSet();
            }
            inputsWithInitializers.add(input.getEntityName());
            LOG.info("Starting root input initializer for input: "
                + input.getEntityName() + ", with class: ["
                + input.getInitializerClassName() + "]");
          }
        }
      }
    }

    boolean hasBipartite = false;
    if (sourceVertices != null) {
      for (Edge edge : sourceVertices.values()) {
        if (edge.getEdgeProperty().getDataMovementType() == DataMovementType.SCATTER_GATHER) {
          hasBipartite = true;
          break;
        }
      }
    }

    if (hasBipartite && inputsWithInitializers != null) {
      LOG.fatal("A vertex with an Initial Input and a Shuffle Input are not supported at the moment");
      if (event != null) {
        return VertexState.FAILED;
      } else {
        return finished(VertexState.FAILED);
      }
    }

    assignVertexManager();

    vertexManager.initialize();

    // Setup tasks early if possible. If the VertexManager is not being used
    // to set parallelism, sending events to Tasks is safe (and less confusing
    // then relying on tasks to be created after TaskEvents are generated).
    // For VertexManagers setting parallelism, the setParallelism call needs
    // to be inline.
    if (event != null) {
      numTasks = event.getNumTasks();
    } else {
      numTasks = getVertexPlan().getTaskConfig().getNumTasks();
    }

    if (!(numTasks == -1 || numTasks >= 0)) {
      addDiagnostic("Invalid task count for vertex"
          + ", numTasks=" + numTasks);
      trySetTerminationCause(VertexTerminationCause.INVALID_NUM_OF_TASKS);
      if (event != null) {
        abortVertex(VertexStatus.State.FAILED);
        return finished(VertexState.FAILED);
      } else {
        return VertexState.FAILED;
      }
    }

    checkTaskLimits();
    return VertexState.INITED;
  }