public void handle()

in tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java [284:383]


  public void handle(DAGHistoryEvent event) throws IOException {
    if (stopped.get()) {
      LOG.warn("Ignoring event as service stopped, eventType"
          + event.getHistoryEvent().getEventType());
      return;
    }
    HistoryEventType eventType = event.getHistoryEvent().getEventType();

    if (recoveryFatalErrorOccurred.get()) {
      return;
    }

    if (!started.get()) {
      LOG.warn("Adding event of type " + eventType
          + " to queue as service not started");
      addToEventQueue(event);
      return;
    }

    TezDAGID dagId = event.getDAGID();
    if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
      DAGSubmittedEvent dagSubmittedEvent =
          (DAGSubmittedEvent) event.getHistoryEvent();
      String dagName = dagSubmittedEvent.getDAGName();
      if (dagName != null
          && dagName.startsWith(
              TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
        // Skip recording pre-warm DAG events
        skippedDAGs.add(dagId);
        return;
      }
    }
    if (dagId == null || skippedDAGs.contains(dagId)) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Skipping event for DAG"
            + ", eventType=" + eventType
            + ", dagId=" + (dagId == null ? "null" : dagId.toString())
            + ", isSkippedDAG=" + (dagId == null ? "null"
            : skippedDAGs.contains(dagId)));
      }
      return;
    }

    if (event.getHistoryEvent() instanceof SummaryEvent) {
      synchronized (lock) {
        if (stopped.get()) {
          LOG.warn("Ignoring event as service stopped, eventType"
              + event.getHistoryEvent().getEventType());
          return;
        }
        try {
          SummaryEvent summaryEvent = (SummaryEvent) event.getHistoryEvent();
          handleSummaryEvent(dagId, eventType, summaryEvent);
          if (summaryEvent.writeToRecoveryImmediately()) {
            handleRecoveryEvent(event);
            // outputStream may already be closed and removed
            if (outputStreamMap.containsKey(event.getDAGID())) {
              doFlush(outputStreamMap.get(event.getDAGID()),
                  appContext.getClock().getTime());
            }
          } else {
            if (LOG.isDebugEnabled()) {
              LOG.debug("Queueing Non-immediate Summary/Recovery event of type"
                  + eventType.name());
            }
            addToEventQueue(event);
          }
          if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
            LOG.info("DAG completed"
                + ", dagId=" + event.getDAGID()
                + ", queueSize=" + eventQueue.size());
            completedDAGs.add(dagId);
            if (outputStreamMap.containsKey(dagId)) {
              try {
                outputStreamMap.get(dagId).close();
                outputStreamMap.remove(dagId);
              } catch (IOException ioe) {
                LOG.warn("Error when trying to flush/close recovery file for"
                    + " dag, dagId=" + event.getDAGID());
              }
            }
          }
        } catch (IOException ioe) {
          LOG.error("Error handling summary event"
              + ", eventType=" + event.getHistoryEvent().getEventType(), ioe);
          createFatalErrorFlagDir();
          if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
            // Throw error to tell client that dag submission failed
            throw ioe;
          }
        }
      }
    } else {
      // All other events just get queued
      if (LOG.isDebugEnabled()) {
        LOG.debug("Queueing Non-Summary Recovery event of type " + eventType.name());
      }
      addToEventQueue(event);
    }
  }