public VertexImpl()

in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java [980:1174]


  public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan,
      String vertexName, Configuration dagConf, EventHandler eventHandler,
      TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Clock clock,
      TaskHeartbeatHandler thh, boolean commitVertexOutputs,
      AppContext appContext, VertexLocationHint vertexLocationHint,
      Map<String, VertexGroupInfo> dagVertexGroups, TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption,
      StateChangeNotifier entityStatusTracker, Configuration dagOnlyConf) {
    this.vertexId = vertexId;
    this.vertexPlan = vertexPlan;
    this.vertexName = StringInterner.intern(vertexName);
    this.vertexConf = new Configuration(dagConf);
    this.vertexOnlyConf = new Configuration(dagOnlyConf);
    if (vertexPlan.hasVertexConf()) {
      ConfigurationProto confProto = vertexPlan.getVertexConf();
      for (PlanKeyValuePair keyValuePair : confProto.getConfKeyValuesList()) {
        TezConfiguration.validateProperty(keyValuePair.getKey(), Scope.VERTEX);
        vertexConf.set(keyValuePair.getKey(), keyValuePair.getValue());
        vertexOnlyConf.set(keyValuePair.getKey(), keyValuePair.getValue());
      }
    }
    this.vertexContextConfig = new VertexConfigImpl(vertexConf);

    this.clock = clock;
    this.appContext = appContext;
    this.commitVertexOutputs = commitVertexOutputs;
    this.logIdentifier =  this.getVertexId() + " [" + this.getName() + "]";

    this.taskCommunicatorManagerInterface = taskCommunicatorManagerInterface;
    this.taskHeartbeatHandler = thh;
    this.eventHandler = eventHandler;
    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    this.readLock = readWriteLock.readLock();
    this.writeLock = readWriteLock.writeLock();

    if (LOG.isDebugEnabled()) {
      logLocationHints(this.vertexName, vertexLocationHint);
    }
    setTaskLocationHints(vertexLocationHint);

    this.dagUgi = appContext.getCurrentDAG().getDagUGI();
    this.dag = appContext.getCurrentDAG();

    this.taskResource = DagTypeConverters
        .createResourceRequestFromTaskConfig(vertexPlan.getTaskConfig());
    this.processorDescriptor = DagTypeConverters
        .convertProcessorDescriptorFromDAGPlan(vertexPlan
            .getProcessorDescriptor());
    this.localResources = DagTypeConverters
        .createLocalResourceMapFromDAGPlan(vertexPlan.getTaskConfig()
            .getLocalResourceList());
    this.localResources.putAll(dag.getLocalResources());
    this.environment = DagTypeConverters
        .createEnvironmentMapFromDAGPlan(vertexPlan.getTaskConfig()
            .getEnvironmentSettingList());
    this.taskSpecificLaunchCmdOpts = taskSpecificLaunchCmdOption;
    this.recoveryData = appContext.getDAGRecoveryData() == null ?
        null : appContext.getDAGRecoveryData().getVertexRecoveryData(vertexId);
    // Set up log properties, including task specific log properties.
    String javaOptsWithoutLoggerMods =
        vertexPlan.getTaskConfig().hasJavaOpts() ? vertexPlan.getTaskConfig().getJavaOpts() : null;
    String logString = vertexConf.get(TezConfiguration.TEZ_TASK_LOG_LEVEL, TezConfiguration.TEZ_TASK_LOG_LEVEL_DEFAULT);
    String [] taskLogParams = TezClientUtils.parseLogParams(logString);
    this.javaOpts = TezClientUtils.maybeAddDefaultLoggingJavaOpts(taskLogParams[0], javaOptsWithoutLoggerMods);

    if (taskSpecificLaunchCmdOpts.hasModifiedLogProperties()) {
      String [] taskLogParamsTaskSpecific = taskSpecificLaunchCmdOption.getTaskSpecificLogParams();
      this.javaOptsTaskSpecific = TezClientUtils
          .maybeAddDefaultLoggingJavaOpts(taskLogParamsTaskSpecific[0], javaOptsWithoutLoggerMods);

      environmentTaskSpecific = new HashMap<String, String>(this.environment.size());
      environmentTaskSpecific.putAll(environment);
      if (taskLogParamsTaskSpecific.length == 2 && !Strings.isNullOrEmpty(taskLogParamsTaskSpecific[1])) {
        TezClientUtils.addLogParamsToEnv(environmentTaskSpecific, taskLogParamsTaskSpecific);
      }
    } else {
      this.javaOptsTaskSpecific = null;
      this.environmentTaskSpecific = null;
    }

    // env for tasks which don't have task-specific configuration. Has to be set up later to
    // optionally allow copying this for specific tasks
    TezClientUtils.addLogParamsToEnv(this.environment, taskLogParams);


    this.containerContext = new ContainerContext(this.localResources,
        appContext.getCurrentDAG().getCredentials(), this.environment, this.javaOpts, this);
    LOG.info("Default container context for " + logIdentifier + "=" + containerContext + ", Default Resources=" + this.taskResource);

    if (vertexPlan.getInputsCount() > 0) {
      setAdditionalInputs(vertexPlan.getInputsList());
    }
    if (vertexPlan.getOutputsCount() > 0) {
      setAdditionalOutputs(vertexPlan.getOutputsList());
    }
    this.stateChangeNotifier = entityStatusTracker;

    // Setup the initial parallelism early. This may be changed after
    // initialization or on a setParallelism call.
    this.numTasks = vertexPlan.getTaskConfig().getNumTasks();
    // Not sending the notifier a parallelism update since this is the initial parallelism

    this.dagVertexGroups = dagVertexGroups;
    
    isSpeculationEnabled =
        vertexConf.getBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED,
            TezConfiguration.TEZ_AM_SPECULATION_ENABLED_DEFAULT);
    servicesInited = new AtomicBoolean(false);
    initServices();

    maxFailuresPercent = vertexConf.getFloat(TezConfiguration.TEZ_VERTEX_FAILURES_MAXPERCENT,
            TezConfiguration.TEZ_VERTEX_FAILURES_MAXPERCENT_DEFAULT);

    // This "this leak" is okay because the retained pointer is in an
    //  instance variable.

    boolean isLocal = vertexConf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE,
        TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);

    String tezDefaultComponentName =
        isLocal ? TezConstants.getTezUberServicePluginName() :
            TezConstants.getTezYarnServicePluginName();

    org.apache.tez.dag.api.Vertex.VertexExecutionContext execContext = dag.getDefaultExecutionContext();
    if (vertexPlan.hasExecutionContext()) {
      execContext = DagTypeConverters.convertFromProto(vertexPlan.getExecutionContext());
      LOG.info("Using ExecutionContext from Vertex for Vertex {}", vertexName);
    } else if (execContext != null) {
      LOG.info("Using ExecutionContext from DAG for Vertex {}", vertexName);
    }
    if (execContext != null) {
      if (execContext.shouldExecuteInAm()) {
        tezDefaultComponentName = TezConstants.getTezUberServicePluginName();
      }
    }

    String taskSchedulerName = tezDefaultComponentName;
    String containerLauncherName = tezDefaultComponentName;
    String taskCommName = tezDefaultComponentName;

    if (execContext != null) {
      if (execContext.getTaskSchedulerName() != null) {
        taskSchedulerName = execContext.getTaskSchedulerName();
      }
      if (execContext.getContainerLauncherName() != null) {
        containerLauncherName = execContext.getContainerLauncherName();
      }
      if (execContext.getTaskCommName() != null) {
        taskCommName = execContext.getTaskCommName();
      }
    }

    try {
      taskSchedulerIdentifier = appContext.getTaskScheduerIdentifier(taskSchedulerName);
    } catch (Exception e) {
      LOG.error("Failed to get index for taskScheduler: " + taskSchedulerName);
      throw e;
    }
    try {
      taskCommunicatorIdentifier = appContext.getTaskCommunicatorIdentifier(taskCommName);
    } catch (Exception e) {
      LOG.error("Failed to get index for taskCommunicator: " + taskCommName);
      throw e;
    }
    try {
      containerLauncherIdentifier =
          appContext.getContainerLauncherIdentifier(containerLauncherName);
    } catch (Exception e) {
      LOG.error("Failed to get index for containerLauncher: " + containerLauncherName);
      throw e;
    }
    this.servicePluginInfo = new ServicePluginInfo()
        .setContainerLauncherName(
            appContext.getContainerLauncherName(this.containerLauncherIdentifier))
        .setTaskSchedulerName(appContext.getTaskSchedulerName(this.taskSchedulerIdentifier))
        .setTaskCommunicatorName(appContext.getTaskCommunicatorName(this.taskCommunicatorIdentifier))
        .setContainerLauncherClassName(
            appContext.getContainerLauncherClassName(this.containerLauncherIdentifier))
        .setTaskSchedulerClassName(
            appContext.getTaskSchedulerClassName(this.taskSchedulerIdentifier))
        .setTaskCommunicatorClassName(
            appContext.getTaskCommunicatorClassName(this.taskCommunicatorIdentifier));

    StringBuilder sb = new StringBuilder();
    sb.append("Running vertex: ").append(logIdentifier).append(" : ")
        .append("TaskScheduler=").append(taskSchedulerIdentifier).append(":").append(taskSchedulerName)
        .append(", ContainerLauncher=").append(containerLauncherIdentifier).append(":").append(containerLauncherName)
        .append(", TaskCommunicator=").append(taskCommunicatorIdentifier).append(":").append(taskCommName);
    LOG.info(sb.toString());
    cleanupShuffleDataAtVertexLevel = vertexConf.getInt(TezConfiguration.TEZ_AM_VERTEX_CLEANUP_HEIGHT,
            TezConfiguration.TEZ_AM_VERTEX_CLEANUP_HEIGHT_DEFAULT) > 0 &&
            ShuffleUtils.isTezShuffleHandler(vertexConf);
    stateMachine = new StateMachineTez<VertexState, VertexEventType, VertexEvent, VertexImpl>(
        stateMachineFactory.make(this), this);
    augmentStateMachine();
  }