protected void serviceInit()

in tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java [430:655]


  protected void serviceInit(final Configuration conf) throws Exception {

    this.amConf = conf;
    initResourceCalculatorPlugins();
    this.hadoopShim = new HadoopShimsLoader(this.amConf).getHadoopShim();

    long sleepTimeBeforeSecs = this.amConf.getLong(
        TezConfiguration.TEZ_AM_SLEEP_TIME_BEFORE_EXIT_MILLIS,
        TezConstants.TEZ_DAG_SLEEP_TIME_BEFORE_EXIT);
    if (sleepTimeBeforeSecs >= 0) {
      this.shutdownHandler.setSleepTimeBeforeExit(sleepTimeBeforeSecs);
    }

    this.isLocal = conf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE,
        TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);

    UserPayload defaultPayload = TezUtils.createUserPayloadFromConf(amConf);

    List<NamedEntityDescriptor> taskSchedulerDescriptors = Lists.newLinkedList();
    List<NamedEntityDescriptor> containerLauncherDescriptors = Lists.newLinkedList();
    List<NamedEntityDescriptor> taskCommunicatorDescriptors = Lists.newLinkedList();

    parseAllPlugins(taskSchedulerDescriptors, taskSchedulers, containerLauncherDescriptors,
        containerLaunchers, taskCommunicatorDescriptors, taskCommunicators, amPluginDescriptorProto,
        isLocal, defaultPayload);


    LOG.info(buildPluginComponentLog(taskSchedulerDescriptors, taskSchedulers, "TaskSchedulers"));
    LOG.info(buildPluginComponentLog(containerLauncherDescriptors, containerLaunchers, "ContainerLaunchers"));
    LOG.info(buildPluginComponentLog(taskCommunicatorDescriptors, taskCommunicators, "TaskCommunicators"));

    boolean disableVersionCheck = conf.getBoolean(
        TezConfiguration.TEZ_AM_DISABLE_CLIENT_VERSION_CHECK,
        TezConfiguration.TEZ_AM_DISABLE_CLIENT_VERSION_CHECK_DEFAULT);

    // Check client - AM version compatibility
    LOG.info("Comparing client version with AM version"
        + ", clientVersion=" + clientVersion
        + ", AMVersion=" + dagVersionInfo.getVersion());
    Simple2LevelVersionComparator versionComparator = new Simple2LevelVersionComparator();
    if (versionComparator.compare(clientVersion, dagVersionInfo.getVersion()) != 0) {
      versionMismatchDiagnostics = "Incompatible versions found"
          + ", clientVersion=" + clientVersion
          + ", AMVersion=" + dagVersionInfo.getVersion();
      addDiagnostic(versionMismatchDiagnostics);
      if (disableVersionCheck) {
        LOG.warn("Ignoring client-AM version mismatch as check disabled. "
            + versionMismatchDiagnostics);
      } else {
        LOG.error(versionMismatchDiagnostics);
        versionMismatch = true;
      }
    }

    dispatcher = createDispatcher();

    if (isLocal) {
       conf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
       conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
           TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS_DEFAULT);
    } else {
      dispatcher.enableExitOnDispatchException();
    }
    String strAppId = this.appAttemptID.getApplicationId().toString();
    this.tezSystemStagingDir = TezCommonUtils.getTezSystemStagingPath(conf, strAppId);

    context = new RunningAppContext(conf);
    this.aclManager = new ACLManager(appMasterUgi.getShortUserName(), this.amConf);

    clientHandler = new DAGClientHandler(this);

    addIfService(dispatcher, false);

    recoveryDataDir = TezCommonUtils.getRecoveryPath(tezSystemStagingDir, conf);
    recoveryFS = recoveryDataDir.getFileSystem(conf);
    currentRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir,
        appAttemptID.getAttemptId());
    if (LOG.isDebugEnabled()) {
      LOG.debug("Stage directory information for AppAttemptId :" + this.appAttemptID
          + " tezSystemStagingDir :" + tezSystemStagingDir + " recoveryDataDir :" + recoveryDataDir
          + " recoveryAttemptDir :" + currentRecoveryDataDir);
    }
    recoveryEnabled = conf.getBoolean(TezConfiguration.DAG_RECOVERY_ENABLED,
        TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT);

    initClientRpcServer();

    taskHeartbeatHandler = createTaskHeartbeatHandler(context, conf);
    addIfService(taskHeartbeatHandler, true);

    containerHeartbeatHandler = createContainerHeartbeatHandler(context, conf);
    addIfService(containerHeartbeatHandler, true);

    jobTokenSecretManager = new JobTokenSecretManager(amConf);

    sessionToken =
        TokenCache.getSessionToken(amCredentials);
    if (sessionToken == null) {
      throw new RuntimeException("Could not find session token in AM Credentials");
    }

    // Prepare the TaskAttemptListener server for authentication of Containers
    // TaskAttemptListener gets the information via jobTokenSecretManager.
    jobTokenSecretManager.addTokenForJob(
        appAttemptID.getApplicationId().toString(), sessionToken);



    //service to handle requests to TaskUmbilicalProtocol
    taskCommunicatorManager = createTaskCommunicatorManager(context,
        taskHeartbeatHandler, containerHeartbeatHandler, taskCommunicatorDescriptors);
    addIfService(taskCommunicatorManager, true);

    containerSignatureMatcher = createContainerSignatureMatcher();
    containers = new AMContainerMap(containerHeartbeatHandler,
        taskCommunicatorManager, containerSignatureMatcher, context);
    addIfService(containers, true);
    dispatcher.register(AMContainerEventType.class, containers);

    nodes = new AMNodeTracker(dispatcher.getEventHandler(), context);
    addIfService(nodes, true);
    dispatcher.register(AMNodeEventType.class, nodes);

    this.dagEventDispatcher = new DagEventDispatcher();
    this.vertexEventDispatcher = new VertexEventDispatcher();

    //register the event dispatchers
    dispatcher.register(DAGAppMasterEventType.class, new DAGAppMasterEventHandler());
    dispatcher.register(DAGEventType.class, dagEventDispatcher);
    dispatcher.register(VertexEventType.class, vertexEventDispatcher);
    boolean useConcurrentDispatcher =
        conf.getBoolean(TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER,
            TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER_DEFAULT);
    LOG.info("Using concurrent dispatcher: " + useConcurrentDispatcher);
    if (!useConcurrentDispatcher) {
      dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
      dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
    } else {
      int concurrency = conf.getInt(TezConfiguration.TEZ_AM_CONCURRENT_DISPATCHER_CONCURRENCY,
          TezConfiguration.TEZ_AM_CONCURRENT_DISPATCHER_CONCURRENCY_DEFAULT);
      AsyncDispatcherConcurrent sharedDispatcher = dispatcher.registerAndCreateDispatcher(
          TaskEventType.class, new TaskEventDispatcher(), "TaskAndAttemptEventThread", concurrency);
      dispatcher.registerWithExistingDispatcher(TaskAttemptEventType.class,
          new TaskAttemptEventDispatcher(), sharedDispatcher);
    }

    // register other delegating dispatchers
    dispatcher.registerAndCreateDispatcher(SpeculatorEventType.class, new SpeculatorEventHandler(),
        "Speculator");

    if (enableWebUIService()) {
      this.webUIService = new WebUIService(context);
      addIfService(webUIService, false);
    } else {
      LOG.debug("Web UI Service is not enabled.");
    }

    this.taskSchedulerManager = createTaskSchedulerManager(taskSchedulerDescriptors);
    addIfService(taskSchedulerManager, true);

    if (enableWebUIService()) {
      addIfServiceDependency(taskSchedulerManager, webUIService);
    }

    dispatcher.register(AMSchedulerEventType.class,
        taskSchedulerManager);
    addIfServiceDependency(taskSchedulerManager, clientRpcServer);

    appMasterReadinessService = createAppMasterReadinessService();

    this.containerLauncherManager = createContainerLauncherManager(containerLauncherDescriptors,
        isLocal);
    addIfService(containerLauncherManager, true);
    dispatcher.register(ContainerLauncherEventType.class, containerLauncherManager);

    historyEventHandler = createHistoryEventHandler(context);
    addIfService(historyEventHandler, true);

    this.sessionTimeoutInterval = TezCommonUtils.getDAGSessionTimeout(amConf);
    this.clientAMHeartbeatTimeoutIntervalMillis =
        TezCommonUtils.getAMClientHeartBeatTimeoutMillis(amConf);

    if (!versionMismatch) {
      if (isSession) {
        try (BufferedInputStream sessionResourcesStream =
            new BufferedInputStream(
                new FileInputStream(new File(workingDirectory,
                    TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME)))) {
          PlanLocalResourcesProto amLocalResourceProto = PlanLocalResourcesProto
              .parseDelimitedFrom(sessionResourcesStream);
          amResources.putAll(DagTypeConverters
              .convertFromPlanLocalResources(amLocalResourceProto));
        }
      }
    }

    int threadCount = conf.getInt(TezConfiguration.TEZ_AM_DAG_APPCONTEXT_THREAD_COUNT_LIMIT,
            TezConfiguration.TEZ_AM_DAG_APPCONTEXT_THREAD_COUNT_LIMIT_DEFAULT);
    // NOTE: LinkedBlockingQueue does not have a capacity Limit and can thus
    // occupy large memory chunks when numerous Runables are pending for execution
    ExecutorService rawExecutor =
        Executors.newFixedThreadPool(threadCount, new ThreadFactoryBuilder()
            .setDaemon(true).setNameFormat("App Shared Pool - #%d").build());
    execService = MoreExecutors.listeningDecorator(rawExecutor);

    initServices(conf);
    super.serviceInit(conf);

    if (!versionMismatch) {
      if (this.appAttemptID.getAttemptId() == 1) {
        AppLaunchedEvent appLaunchedEvent = new AppLaunchedEvent(appAttemptID.getApplicationId(),
            startTime, appSubmitTime, appMasterUgi.getShortUserName(), this.amConf,
            dagVersionInfo);
        historyEventHandler.handle(
            new DAGHistoryEvent(appLaunchedEvent));
      }
      AMLaunchedEvent launchedEvent = new AMLaunchedEvent(appAttemptID,
          startTime, appSubmitTime, appMasterUgi.getShortUserName());
      historyEventHandler.handle(
          new DAGHistoryEvent(launchedEvent));

      this.state = DAGAppMasterState.INITED;
    } else {
      this.state = DAGAppMasterState.ERROR;
    }
  }