private synchronized void handle()

in tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java [443:540]


  private synchronized void handle(DAGAppMasterEvent event) {
    switch (event.getType()) {
    case INTERNAL_ERROR:
      state = DAGAppMasterState.ERROR;
      if(currentDAG != null) {
        _updateLoggers(currentDAG, "_post");
        // notify dag to finish which will send the DAG_FINISHED event
        LOG.info("Internal Error. Notifying dags to finish.");
        sendEvent(new DAGEvent(currentDAG.getID(), DAGEventType.INTERNAL_ERROR));
      } else {
        LOG.info("Internal Error. Finishing directly as no dag is active.");
        this.taskSchedulerEventHandler.setShouldUnregisterFlag();
        shutdownHandler.shutdown();
      }
      break;
    case DAG_FINISHED:
      DAGAppMasterEventDAGFinished finishEvt =
          (DAGAppMasterEventDAGFinished) event;
      if (!isSession) {
        LOG.info("Not a session, AM will unregister as DAG has completed");
        this.taskSchedulerEventHandler.setShouldUnregisterFlag();
        _updateLoggers(currentDAG, "_post");
        setStateOnDAGCompletion();
        LOG.info("Shutting down on completion of dag:" +
              finishEvt.getDAGId().toString());
        shutdownHandler.shutdown();
      } else {
        LOG.info("DAG completed, dagId="
            + finishEvt.getDAGId().toString()
            + ", dagState=" + finishEvt.getDAGState());
        lastDAGCompletionTime = clock.getTime();
        _updateLoggers(currentDAG, "_post");
        if (this.historyEventHandler.hasRecoveryFailed()) {
          LOG.warn("Recovery had a fatal error, shutting down session after" +
              " DAG completion");
          sessionStopped.set(true);
        }
        switch(finishEvt.getDAGState()) {
        case SUCCEEDED:
          if (!currentDAG.getName().startsWith(
              TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) {
            successfulDAGs.incrementAndGet();
          }
          break;
        case FAILED:
          if (!currentDAG.getName().startsWith(
              TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) {
            failedDAGs.incrementAndGet();
          }
          break;
        case KILLED:
          if (!currentDAG.getName().startsWith(
              TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) {
            killedDAGs.incrementAndGet();
          }
          break;
        case ERROR:
          if (!currentDAG.getName().startsWith(
              TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) {
            failedDAGs.incrementAndGet();
          }
        default:
          LOG.fatal("Received a DAG Finished Event with state="
              + finishEvt.getDAGState()
              + ". Error. Shutting down.");
          state = DAGAppMasterState.ERROR;
          this.taskSchedulerEventHandler.setShouldUnregisterFlag();
          shutdownHandler.shutdown();
          break;
        }
        if (!state.equals(DAGAppMasterState.ERROR)) {
          if (!sessionStopped.get()) {
            LOG.info("Waiting for next DAG to be submitted.");
            this.taskSchedulerEventHandler.dagCompleted();
            state = DAGAppMasterState.IDLE;
          } else {
            LOG.info("Session shutting down now.");
            this.taskSchedulerEventHandler.setShouldUnregisterFlag();
            if (this.historyEventHandler.hasRecoveryFailed()) {
              state = DAGAppMasterState.FAILED;
            } else {
              state = DAGAppMasterState.SUCCEEDED;
            }
            shutdownHandler.shutdown();
          }
        }
      }
      break;
    case AM_REBOOT:
      LOG.info("Received an AM_REBOOT signal");
      this.state = DAGAppMasterState.KILLED;
      shutdownHandler.shutdown(true);
      break;
    default:
      throw new TezUncheckedException(
          "AppMaster: No handler for event type: " + event.getType());
    }
  }