public synchronized void serviceStart()

in tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java [1457:1550]


  public synchronized void serviceStart() throws Exception {

    //start all the components
    startServices();
    super.serviceStart();

    // metrics system init is really init & start.
    // It's more test friendly to put it here.
    DefaultMetricsSystem.initialize("DAGAppMaster");

    this.appsStartTime = clock.getTime();
    AMStartedEvent startEvent = new AMStartedEvent(appAttemptID,
        appsStartTime, appMasterUgi.getShortUserName());
    historyEventHandler.handle(
        new DAGHistoryEvent(startEvent));

    this.lastDAGCompletionTime = clock.getTime();

    RecoveredDAGData recoveredDAGData;
    try {
      recoveredDAGData = recoverDAG();
    } catch (IOException e) {
      LOG.error("Error occurred when trying to recover data from previous attempt."
          + " Shutting down AM", e);
      this.state = DAGAppMasterState.ERROR;
      this.taskSchedulerEventHandler.setShouldUnregisterFlag();
      shutdownHandler.shutdown();
      return;
    }

    if (!isSession) {
      LOG.info("In Non-Session mode.");
    } else {
      LOG.info("In Session mode. Waiting for DAG over RPC");
      this.state = DAGAppMasterState.IDLE;
    }

    if (recoveredDAGData != null) {
      List<URL> classpathUrls = null;
      if (recoveredDAGData.cumulativeAdditionalResources != null) {
        classpathUrls = processAdditionalResources(recoveredDAGData.cumulativeAdditionalResources);
        amResources.putAll(recoveredDAGData.cumulativeAdditionalResources);
        cumulativeAdditionalResources.putAll(recoveredDAGData.cumulativeAdditionalResources);
      }
      
      if (recoveredDAGData.isCompleted
          || recoveredDAGData.nonRecoverable) {
        LOG.info("Found previous DAG in completed or non-recoverable state"
            + ", dagId=" + recoveredDAGData.recoveredDagID
            + ", isCompleted=" + recoveredDAGData.isCompleted
            + ", isNonRecoverable=" + recoveredDAGData.nonRecoverable
            + ", state=" + (recoveredDAGData.dagState == null ? "null" :
                recoveredDAGData.dagState)
            + ", failureReason=" + recoveredDAGData.reason);
        _updateLoggers(recoveredDAGData.recoveredDAG, "");
        if (recoveredDAGData.nonRecoverable) {
          DAGEventRecoverEvent recoverDAGEvent =
              new DAGEventRecoverEvent(recoveredDAGData.recoveredDAG.getID(),
                  DAGState.FAILED, classpathUrls);
          dagEventDispatcher.handle(recoverDAGEvent);
          this.state = DAGAppMasterState.RUNNING;
        } else {
          DAGEventRecoverEvent recoverDAGEvent =
              new DAGEventRecoverEvent(recoveredDAGData.recoveredDAG.getID(),
                  recoveredDAGData.dagState, classpathUrls);
          dagEventDispatcher.handle(recoverDAGEvent);
          this.state = DAGAppMasterState.RUNNING;
        }
      } else {
        LOG.info("Found DAG to recover, dagId=" + recoveredDAGData.recoveredDAG.getID());
        _updateLoggers(recoveredDAGData.recoveredDAG, "");
        DAGEventRecoverEvent recoverDAGEvent = new DAGEventRecoverEvent(
            recoveredDAGData.recoveredDAG.getID(), classpathUrls);
        dagEventDispatcher.handle(recoverDAGEvent);
        this.state = DAGAppMasterState.RUNNING;
      }
    } else {
      if (!isSession) {
        // No dag recovered - in non-session, just restart the original DAG
        dagCounter.set(0);
        startDAG();
      }
    }

    if (isSession) {
      this.dagSubmissionTimer = new Timer(true);
      this.dagSubmissionTimer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
          checkAndHandleSessionTimeout();
        }
      }, sessionTimeoutInterval, sessionTimeoutInterval / 10);
    }
  }