public void handle()

in tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java [177:285]


  public void handle(DAGHistoryEvent event) throws IOException {
    if (stopped.get()) {
      LOG.warn("Igoring event as service stopped, eventType"
          + event.getHistoryEvent().getEventType());
      return;
    }
    HistoryEventType eventType = event.getHistoryEvent().getEventType();

    if (recoveryFatalErrorOccurred.get()) {
      return;
    }

    if (!started.get()) {
      LOG.warn("Adding event of type " + eventType
          + " to queue as service not started");
      eventQueue.add(event);
      return;
    }

    TezDAGID dagId = event.getDagID();
    if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
      DAGSubmittedEvent dagSubmittedEvent =
          (DAGSubmittedEvent) event.getHistoryEvent();
      String dagName = dagSubmittedEvent.getDAGName();
      if (dagName != null
          && dagName.startsWith(
          TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) {
        // Skip recording pre-warm DAG events
        skippedDAGs.add(dagId);
        return;
      }
    }
    if (dagId == null || skippedDAGs.contains(dagId)) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Skipping event for DAG"
            + ", eventType=" + eventType
            + ", dagId=" + (dagId == null ? "null" : dagId.toString())
            + ", isSkippedDAG=" + (dagId == null ? "null"
            : skippedDAGs.contains(dagId)));
      }
      return;
    }

    if (event.getHistoryEvent() instanceof SummaryEvent) {
      synchronized (lock) {
        try {
          SummaryEvent summaryEvent = (SummaryEvent) event.getHistoryEvent();
          handleSummaryEvent(dagId, eventType, summaryEvent);
          summaryStream.hsync();
          if (summaryEvent.writeToRecoveryImmediately()) {
            handleRecoveryEvent(event);
            doFlush(outputStreamMap.get(event.getDagID()),
                appContext.getClock().getTime(), true);
          } else {
            if (LOG.isDebugEnabled()) {
              LOG.debug("Queueing Non-immediate Summary/Recovery event of type"
                  + eventType.name());
            }
            eventQueue.add(event);
          }
          if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
            LOG.info("DAG completed"
                + ", dagId=" + event.getDagID()
                + ", queueSize=" + eventQueue.size());
            completedDAGs.add(dagId);
            if (outputStreamMap.containsKey(dagId)) {
              try {
                outputStreamMap.get(dagId).close();
                outputStreamMap.remove(dagId);
              } catch (IOException ioe) {
                LOG.warn("Error when trying to flush/close recovery file for"
                    + " dag, dagId=" + event.getDagID());
              }
            }
          }
        } catch (IOException ioe) {
          LOG.error("Error handling summary event"
              + ", eventType=" + event.getHistoryEvent().getEventType(), ioe);
          Path fatalErrorDir = new Path(recoveryPath, RECOVERY_FATAL_OCCURRED_DIR);
          try {
            LOG.error("Adding a flag to ensure next AM attempt does not start up"
                + ", flagFile=" + fatalErrorDir.toString());
            recoveryFatalErrorOccurred.set(true);
            recoveryDirFS.mkdirs(fatalErrorDir);
            if (recoveryDirFS.exists(fatalErrorDir)) {
              LOG.error("Recovery failure occurred. Skipping all events");
            } else {
              // throw error if fatal error flag could not be set
              throw ioe;
            }
          } catch (IOException e) {
            LOG.fatal("Failed to create fatal error flag dir "
                + fatalErrorDir.toString(), e);
            throw ioe;
          }
          if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
            // Throw error to tell client that dag submission failed
            throw ioe;
          }
        }
      }
    } else {
      // All other events just get queued
      if (LOG.isDebugEnabled()) {
        LOG.debug("Queueing Non-Summary Recovery event of type " + eventType.name());
      }
      eventQueue.add(event);
    }
  }