public synchronized void serviceInit()

in tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java [289:424]


  public synchronized void serviceInit(final Configuration conf) throws Exception {

    int maxAppAttempts = 1;
    String maxAppAttemptsEnv = System.getenv(
        ApplicationConstants.MAX_APP_ATTEMPTS_ENV);
    if (maxAppAttemptsEnv != null) {
      maxAppAttempts = Integer.valueOf(maxAppAttemptsEnv);
    }
    isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts;

    this.amConf = conf;
    conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);

    dispatcher = createDispatcher();
    context = new RunningAppContext(conf);

    clientHandler = new DAGClientHandler(this);

    addIfService(dispatcher, false);

    clientRpcServer = new DAGClientServer(clientHandler, appAttemptID);
    addIfService(clientRpcServer, true);

    taskHeartbeatHandler = createTaskHeartbeatHandler(context, conf);
    addIfService(taskHeartbeatHandler, true);

    containerHeartbeatHandler = createContainerHeartbeatHandler(context, conf);
    addIfService(containerHeartbeatHandler, true);

    sessionToken =
        TokenCache.getSessionToken(amTokens);
    if (sessionToken == null) {
      throw new RuntimeException("Could not find session token in AM Credentials");
    }

    // Prepare the TaskAttemptListener server for authentication of Containers
    // TaskAttemptListener gets the information via jobTokenSecretManager.
    LOG.info("Adding session token to jobTokenSecretManager for application");
    jobTokenSecretManager.addTokenForJob(
        appAttemptID.getApplicationId().toString(), sessionToken);

    //service to handle requests to TaskUmbilicalProtocol
    taskAttemptListener = createTaskAttemptListener(context,
        taskHeartbeatHandler, containerHeartbeatHandler);
    addIfService(taskAttemptListener, true);

    containerSignatureMatcher = createContainerSignatureMatcher();
    containers = new AMContainerMap(containerHeartbeatHandler,
        taskAttemptListener, containerSignatureMatcher, context);
    addIfService(containers, true);
    dispatcher.register(AMContainerEventType.class, containers);

    nodes = new AMNodeMap(dispatcher.getEventHandler(), context);
    addIfService(nodes, true);
    dispatcher.register(AMNodeEventType.class, nodes);

    this.dagEventDispatcher = new DagEventDispatcher();
    this.vertexEventDispatcher = new VertexEventDispatcher();

    //register the event dispatchers
    dispatcher.register(DAGAppMasterEventType.class, new DAGAppMasterEventHandler());
    dispatcher.register(DAGEventType.class, dagEventDispatcher);
    dispatcher.register(VertexEventType.class, vertexEventDispatcher);
    dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
    dispatcher.register(TaskAttemptEventType.class,
        new TaskAttemptEventDispatcher());

    this.taskSchedulerEventHandler = new TaskSchedulerEventHandler(context,
        clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher);
    addIfService(taskSchedulerEventHandler, true);
    if (isLastAMRetry) {
      LOG.info("AM will unregister as this is the last attempt"
          + ", currentAttempt=" + appAttemptID.getAttemptId()
          + ", maxAttempts=" + maxAppAttempts);
      this.taskSchedulerEventHandler.setShouldUnregisterFlag();
    }

    dispatcher.register(AMSchedulerEventType.class,
        taskSchedulerEventHandler);
    addIfServiceDependency(taskSchedulerEventHandler, clientRpcServer);

    containerLauncher = createContainerLauncher(context);
    addIfService(containerLauncher, true);
    dispatcher.register(NMCommunicatorEventType.class, containerLauncher);

    historyEventHandler = new HistoryEventHandler(context);
    addIfService(historyEventHandler, true);

    this.sessionTimeoutInterval = 1000 * amConf.getInt(
            TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS,
            TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS_DEFAULT);
    String strAppId = this.appAttemptID.getApplicationId().toString();
    this.tezSystemStagingDir = TezCommonUtils.getTezSystemStagingPath(conf, strAppId);
    recoveryDataDir = TezCommonUtils.getRecoveryPath(tezSystemStagingDir, conf);
    recoveryFS = recoveryDataDir.getFileSystem(conf);
    currentRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir,
        appAttemptID.getAttemptId());
    if (LOG.isDebugEnabled()) {
      LOG.info("Stage directory information for AppAttemptId :" + this.appAttemptID
          + " tezSystemStagingDir :" + tezSystemStagingDir + " recoveryDataDir :" + recoveryDataDir
          + " recoveryAttemptDir :" + currentRecoveryDataDir);
    }
    if (isSession) {
      FileInputStream sessionResourcesStream = null;
      try {
        sessionResourcesStream = new FileInputStream(
          TezConfiguration.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME);
        PlanLocalResourcesProto sessionLocalResourcesProto =
          PlanLocalResourcesProto.parseDelimitedFrom(sessionResourcesStream);
        PlanLocalResourcesProto amLocalResourceProto = PlanLocalResourcesProto
            .parseDelimitedFrom(sessionResourcesStream);
        sessionResources.putAll(DagTypeConverters.convertFromPlanLocalResources(
          sessionLocalResourcesProto));
        amResources.putAll(DagTypeConverters.convertFromPlanLocalResources(amLocalResourceProto));
        amResources.putAll(sessionResources);
      } finally {
        if (sessionResourcesStream != null) {
          sessionResourcesStream.close();
        }
      }
    }

    recoveryEnabled = conf.getBoolean(TezConfiguration.DAG_RECOVERY_ENABLED,
        TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT);

    initServices(conf);
    super.serviceInit(conf);

    AMLaunchedEvent launchedEvent = new AMLaunchedEvent(appAttemptID,
        startTime, appSubmitTime, appMasterUgi.getShortUserName());
    historyEventHandler.handle(
        new DAGHistoryEvent(launchedEvent));

    this.state = DAGAppMasterState.INITED;

  }