public void serviceStart()

in tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java [1994:2174]


  public void serviceStart() throws Exception {
    //start all the components
    startServices();
    super.serviceStart();

    boolean invalidSession = false;
    if (isSession && !recoveryEnabled && appAttemptID.getAttemptId() > 1) {
      String err = INVALID_SESSION_ERR_MSG;
      LOG.error(err);
      addDiagnostic(err);
      this.state = DAGAppMasterState.ERROR;
      invalidSession = true;
    }
    if (versionMismatch || invalidSession) {
      // Short-circuit and return as no DAG should be run
      this.taskSchedulerManager.setShouldUnregisterFlag();
      shutdownHandler.shutdown();
      return;
    }

    this.appsStartTime = clock.getTime();
    AMStartedEvent startEvent = new AMStartedEvent(appAttemptID,
        appsStartTime, appMasterUgi.getShortUserName());
    historyEventHandler.handle(
        new DAGHistoryEvent(startEvent));

    this.lastDAGCompletionTime = clock.getTime();

    DAGRecoveryData recoveredDAGData;
    try {
      recoveredDAGData = recoverDAG();
    } catch (IOException e) {
      LOG.error("Error occurred when trying to recover data from previous attempt."
          + " Shutting down AM", e);
      this.state = DAGAppMasterState.ERROR;
      this.taskSchedulerManager.setShouldUnregisterFlag();
      shutdownHandler.shutdown();
      return;
    }

    DAGPlan dagPlan = null;
    if (!isSession) {
      LOG.info("In Non-Session mode.");
      dagPlan = readDAGPlanFile();
      if (hasConcurrentEdge(dagPlan)) {
        // Currently a DAG with concurrent edge is deemed unrecoverable
        // (run from scratch) on AM failover. Proper AM failover for DAG with
        // concurrent edge is pending TEZ-4017
        if (recoveredDAGData != null) {
          LOG.warn("Ignoring recoveredDAGData for a recovered DAG with concurrent edge.");
          recoveredDAGData = null;
        }
      }
    } else {
      LOG.info("In Session mode. Waiting for DAG over RPC");
      this.state = DAGAppMasterState.IDLE;
    }

    if (recoveredDAGData != null) {
      if (recoveredDAGData.cumulativeAdditionalResources != null) {
        recoveredDAGData.additionalUrlsForClasspath = processAdditionalResources(
            recoveredDAGData.recoveredDagID,
            recoveredDAGData.cumulativeAdditionalResources);
        amResources.putAll(recoveredDAGData.cumulativeAdditionalResources);
        cumulativeAdditionalResources.putAll(recoveredDAGData.cumulativeAdditionalResources);
      }

      if (recoveredDAGData.isSessionStopped) {
        LOG.info("AM crashed when shutting down in the previous attempt"
            + ", continue the shutdown and recover it to SUCCEEDED");
        this.sessionStopped.set(true);
        return;
      }

      if (recoveredDAGData.isCompleted
          || recoveredDAGData.nonRecoverable) {
        LOG.info("Found previous DAG in completed or non-recoverable state"
            + ", dagId=" + recoveredDAGData.recoveredDagID
            + ", isCompleted=" + recoveredDAGData.isCompleted
            + ", isNonRecoverable=" + recoveredDAGData.nonRecoverable
            + ", state=" + (recoveredDAGData.dagState == null ? "null" :
                recoveredDAGData.dagState)
            + ", failureReason=" + recoveredDAGData.reason);
        updateLoggers(recoveredDAGData.recoveredDAG, "");
        if (recoveredDAGData.nonRecoverable) {
          addDiagnostic("DAG " + recoveredDAGData.recoveredDagID + " can not be recovered due to "
              + recoveredDAGData.reason);
          DAGEventRecoverEvent recoverDAGEvent =
              new DAGEventRecoverEvent(recoveredDAGData.recoveredDAG.getID(),
                  DAGState.FAILED, recoveredDAGData);
          DAGRecoveredEvent dagRecoveredEvent = new DAGRecoveredEvent(this.appAttemptID,
              recoveredDAGData.recoveredDAG.getID(), recoveredDAGData.recoveredDAG.getName(),
              recoveredDAGData.recoveredDAG.getUserName(),
              this.clock.getTime(), DAGState.FAILED, recoveredDAGData.reason,
              this.containerLogs);
          dagRecoveredEvent.setHistoryLoggingEnabled(
              recoveredDAGData.recoveredDAG.getConf().getBoolean(
                  TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED,
                  TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED_DEFAULT));
          this.historyEventHandler.handle(new DAGHistoryEvent(recoveredDAGData.recoveredDAG.getID(),
              dagRecoveredEvent));
          dagEventDispatcher.handle(recoverDAGEvent);
          this.state = DAGAppMasterState.RUNNING;
        } else {
          DAGEventRecoverEvent recoverDAGEvent =
              new DAGEventRecoverEvent(recoveredDAGData.recoveredDAG.getID(),
                  recoveredDAGData.dagState, recoveredDAGData);
          DAGRecoveredEvent dagRecoveredEvent = new DAGRecoveredEvent(this.appAttemptID,
              recoveredDAGData.recoveredDAG.getID(), recoveredDAGData.recoveredDAG.getName(),
              recoveredDAGData.recoveredDAG.getUserName(), this.clock.getTime(),
              recoveredDAGData.dagState, null, this.containerLogs);
          this.historyEventHandler.handle(new DAGHistoryEvent(recoveredDAGData.recoveredDAG.getID(),
              dagRecoveredEvent));
          dagEventDispatcher.handle(recoverDAGEvent);
          this.state = DAGAppMasterState.RUNNING;
        }
      } else {
        LOG.info("Found DAG to recover, dagId=" + recoveredDAGData.recoveredDAG.getID());
        updateLoggers(recoveredDAGData.recoveredDAG, "");
        DAGRecoveredEvent dagRecoveredEvent = new DAGRecoveredEvent(this.appAttemptID,
            recoveredDAGData.recoveredDAG.getID(), recoveredDAGData.recoveredDAG.getName(),
            recoveredDAGData.recoveredDAG.getUserName(), this.clock.getTime(), this.containerLogs);
        this.historyEventHandler.handle(new DAGHistoryEvent(recoveredDAGData.recoveredDAG.getID(),
            dagRecoveredEvent));
        DAGEventRecoverEvent recoverDAGEvent = new DAGEventRecoverEvent(
            recoveredDAGData.recoveredDAG.getID(), recoveredDAGData);
        dagEventDispatcher.handle(recoverDAGEvent);
        // If we reach here, then we have recoverable DAG and we need to
        // reinitialize the vertex services including speculators.
        currentDAG.onStart();
        this.state = DAGAppMasterState.RUNNING;
      }
    } else {
      if (!isSession) {
        // No dag recovered - in non-session, just restart the original DAG
        dagCounter.set(0);
        assert(dagPlan != null);
        startDAG(dagPlan, null);
      }
    }

    if (isSession && sessionTimeoutInterval >= 0) {
      this.dagSubmissionTimer = new Timer("DAGSubmissionTimer", true);
      this.dagSubmissionTimer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
          try {
            checkAndHandleSessionTimeout();
          } catch (TezException e) {
            LOG.error("Error when checking AM session timeout", e);
          }
        }
      }, sessionTimeoutInterval, sessionTimeoutInterval / 10);
    }

    // Ignore client heartbeat timeout in local mode or non-session mode
    if (!isLocal && isSession && clientAMHeartbeatTimeoutIntervalMillis > 0) {
      // reset heartbeat time
      clientHandler.updateLastHeartbeatTime();
      this.clientAMHeartBeatTimeoutService = Executors.newSingleThreadScheduledExecutor(
          new ThreadFactoryBuilder()
              .setDaemon(true).setNameFormat("ClientAMHeartBeatKeepAliveCheck #%d").build()
      );
      this.clientAMHeartBeatTimeoutService.schedule(new Runnable() {
        @Override
        public void run() {
          try {
            long nextExpiry = checkAndHandleDAGClientTimeout();
            if (nextExpiry > 0) {
              clientAMHeartBeatTimeoutService.schedule(this, nextExpiry, TimeUnit.MILLISECONDS);
            }
          } catch (TezException e) {
            // Cannot be thrown unless the AM is being tried to shutdown so no need to
            // reschedule the timer task
            LOG.error("Error when checking Client AM heartbeat timeout", e);
          }
        }
      }, clientAMHeartbeatTimeoutIntervalMillis, TimeUnit.MILLISECONDS);
    }

  }