protected synchronized void handle()

in tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java [738:907]


  protected synchronized void handle(DAGAppMasterEvent event) {
    String errDiagnostics;
    switch (event.getType()) {
    case SCHEDULING_SERVICE_ERROR:
      // Scheduling error - probably an issue with the communication with the RM
      // In this scenario, the AM should shutdown. Expectation is that the RM
      // will restart a new AM attempt.
      // Should not kill the current running DAG to ensure that on restart, we
      // can recover it and continue.
      DAGAppMasterEventSchedulingServiceError schedulingServiceErrorEvent =
          (DAGAppMasterEventSchedulingServiceError) event;
      state = DAGAppMasterState.ERROR;
      errDiagnostics = "Error in the TaskScheduler. Shutting down. ";
      addDiagnostic(errDiagnostics
          + "Error=" + schedulingServiceErrorEvent.getDiagnosticInfo());
      LOG.error(errDiagnostics);
      shutdownHandler.shutdown();
      break;
    case TASK_COMMUNICATOR_SERVICE_FATAL_ERROR:
    case CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR:
    case TASK_SCHEDULER_SERVICE_FATAL_ERROR:
      // A fatal error from the pluggable services. The AM cannot continue operation, and should
      // be shutdown. The AM should not be restarted for recovery.
      DAGAppMasterEventUserServiceFatalError usfe = (DAGAppMasterEventUserServiceFatalError) event;
      Throwable error = usfe.getError();
      errDiagnostics = "Service Error: " + usfe.getDiagnosticInfo()
          + ", eventType=" + event.getType()
          + ", exception=" + (usfe.getError() == null ? "None" : ExceptionUtils.getStackTrace(usfe.getError()));
      LOG.error(errDiagnostics, error);
      addDiagnostic(errDiagnostics);

      handleInternalError("Service error: " + event.getType(), errDiagnostics);
      break;
    case INTERNAL_ERROR:
      handleInternalError("DAGAppMaster Internal Error occurred",
          "DAGAppMaster Internal Error occurred");
      break;
    case DAG_FINISHED:
      for (TezDAGHook hook : hooks) {
        hook.stop();
      }
      DAGAppMasterEventDAGFinished finishEvt =
          (DAGAppMasterEventDAGFinished) event;
      String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime());
      System.err.println(timeStamp + " Completed Dag: " + finishEvt.getDAGId());
      System.out.println(timeStamp + " Completed Dag: " + finishEvt.getDAGId());

      currentDAG.onFinish();

      if (!isSession) {
        LOG.info("Not a session, AM will unregister as DAG has completed");
        this.taskSchedulerManager.setShouldUnregisterFlag();
        updateLoggers(currentDAG, "_post");
        setStateOnDAGCompletion();
        LOG.info("Shutting down on completion of dag:" + finishEvt.getDAGId());
        shutdownHandler.shutdown();
      } else {
        LOG.info("DAG completed, dagId=" + finishEvt.getDAGId() + ", dagState="
            + finishEvt.getDAGState());
        lastDAGCompletionTime = clock.getTime();
        updateLoggers(currentDAG, "_post");
        if (this.historyEventHandler.hasRecoveryFailed()) {
          String recoveryErrorMsg = "Recovery had a fatal error, shutting down session after" +
              " DAG completion";
          LOG.warn(recoveryErrorMsg);
          addDiagnostic(recoveryErrorMsg);
          sessionStopped.set(true);
        }
        switch(finishEvt.getDAGState()) {
        case SUCCEEDED:
          if (!currentDAG.getName().startsWith(
              TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
            successfulDAGs.incrementAndGet();
          }
          break;
        case FAILED:
          if (!currentDAG.getName().startsWith(
              TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
            failedDAGs.incrementAndGet();
          }
          break;
        case KILLED:
          if (!currentDAG.getName().startsWith(
              TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
            killedDAGs.incrementAndGet();
          }
          break;
        case ERROR:
          if (!currentDAG.getName().startsWith(
              TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
            failedDAGs.incrementAndGet();
          }
          // This is a pass-through. Kill the AM if DAG state is ERROR.
        default:
          LOG.error("Received a DAG Finished Event with state="
              + finishEvt.getDAGState()
              + ". Error. Shutting down.");
          addDiagnostic("DAG completed with an ERROR state. Shutting down AM");
          state = DAGAppMasterState.ERROR;
          this.taskSchedulerManager.setShouldUnregisterFlag();
          shutdownHandler.shutdown();
          break;
        }
        if (!state.equals(DAGAppMasterState.ERROR)) {
          if (!sessionStopped.get()) {
            LOG.info("Central Dispatcher queue size after DAG completion, before cleanup: " +
                dispatcher.getQueueSize());
            LOG.info("Waiting for next DAG to be submitted.");

            // Sending this via the event queue, in case there are pending events which need to be
            // processed. TaskKilled for example, or ContainerCompletions.
            // The DAG needs to be part of the event, since the dag can get reset when the next
            // dag is submitted. The next DAG, however, will not start executing till the cleanup
            // is complete, since execution start is on the same dispatcher.
            sendEvent(new DAGAppMasterEventDagCleanup(context.getCurrentDAG()));

            // Leaving the taskSchedulerEventHandler here for now. Doesn't generate new events.
            // However, eventually it needs to be moved out.
            this.taskSchedulerManager.dagCompleted();
          } else {
            LOG.info("Session shutting down now.");
            this.taskSchedulerManager.setShouldUnregisterFlag();
            if (this.historyEventHandler.hasRecoveryFailed()) {
              state = DAGAppMasterState.FAILED;
            } else {
              state = DAGAppMasterState.SUCCEEDED;
            }
            shutdownHandler.shutdown();
          }
        }
      }
      //close all fs related caches
      try {
        FileSystem.closeAllForUGI(context.getCurrentDAG().getDagUGI());
      } catch (IOException e) {
        LOG.warn("Error occurred when trying to close FileSystem for userName " + context
            .getCurrentDAG().getDagUGI().getUserName(), e);
      }
      break;
    case AM_REBOOT:
      LOG.info("Received an AM_REBOOT signal");
      this.state = DAGAppMasterState.KILLED;
      shutdownHandler.shutdown(true);
      break;
    case DAG_CLEANUP:
      DAGAppMasterEventDagCleanup cleanupEvent = (DAGAppMasterEventDagCleanup) event;
      LOG.info("Cleaning up DAG: name=" + cleanupEvent.getDag().getName() + ", with id=" +
          cleanupEvent.getDag().getID());
      containerLauncherManager.dagComplete(cleanupEvent.getDag().getID(), jobTokenSecretManager);
      taskCommunicatorManager.dagComplete(cleanupEvent.getDag());
      nodes.dagComplete(cleanupEvent.getDag());
      containers.dagComplete(cleanupEvent.getDag());
      LOG.info("Completed cleanup for DAG: name=" + cleanupEvent.getDag().getName() + ", with id=" +
          cleanupEvent.getDag().getID());
      synchronized (idleStateLock) {
        state = DAGAppMasterState.IDLE;
        idleStateLock.notify();
      }
      break;
    case NEW_DAG_SUBMITTED:
      // Inform sub-components that a new DAG has been submitted.
      taskSchedulerManager.dagSubmitted();
      containerLauncherManager.dagSubmitted();
      taskCommunicatorManager.dagSubmitted();
      break;
    default:
      throw new TezUncheckedException(
          "AppMaster: No handler for event type: " + event.getType());
    }
  }