in tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java [284:383]
public void handle(DAGHistoryEvent event) throws IOException {
if (stopped.get()) {
LOG.warn("Ignoring event as service stopped, eventType"
+ event.getHistoryEvent().getEventType());
return;
}
HistoryEventType eventType = event.getHistoryEvent().getEventType();
if (recoveryFatalErrorOccurred.get()) {
return;
}
if (!started.get()) {
LOG.warn("Adding event of type " + eventType
+ " to queue as service not started");
addToEventQueue(event);
return;
}
TezDAGID dagId = event.getDAGID();
if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
DAGSubmittedEvent dagSubmittedEvent =
(DAGSubmittedEvent) event.getHistoryEvent();
String dagName = dagSubmittedEvent.getDAGName();
if (dagName != null
&& dagName.startsWith(
TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
// Skip recording pre-warm DAG events
skippedDAGs.add(dagId);
return;
}
}
if (dagId == null || skippedDAGs.contains(dagId)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping event for DAG"
+ ", eventType=" + eventType
+ ", dagId=" + (dagId == null ? "null" : dagId.toString())
+ ", isSkippedDAG=" + (dagId == null ? "null"
: skippedDAGs.contains(dagId)));
}
return;
}
if (event.getHistoryEvent() instanceof SummaryEvent) {
synchronized (lock) {
if (stopped.get()) {
LOG.warn("Ignoring event as service stopped, eventType"
+ event.getHistoryEvent().getEventType());
return;
}
try {
SummaryEvent summaryEvent = (SummaryEvent) event.getHistoryEvent();
handleSummaryEvent(dagId, eventType, summaryEvent);
if (summaryEvent.writeToRecoveryImmediately()) {
handleRecoveryEvent(event);
// outputStream may already be closed and removed
if (outputStreamMap.containsKey(event.getDAGID())) {
doFlush(outputStreamMap.get(event.getDAGID()),
appContext.getClock().getTime());
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Queueing Non-immediate Summary/Recovery event of type"
+ eventType.name());
}
addToEventQueue(event);
}
if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
LOG.info("DAG completed"
+ ", dagId=" + event.getDAGID()
+ ", queueSize=" + eventQueue.size());
completedDAGs.add(dagId);
if (outputStreamMap.containsKey(dagId)) {
try {
outputStreamMap.get(dagId).close();
outputStreamMap.remove(dagId);
} catch (IOException ioe) {
LOG.warn("Error when trying to flush/close recovery file for"
+ " dag, dagId=" + event.getDAGID());
}
}
}
} catch (IOException ioe) {
LOG.error("Error handling summary event"
+ ", eventType=" + event.getHistoryEvent().getEventType(), ioe);
createFatalErrorFlagDir();
if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
// Throw error to tell client that dag submission failed
throw ioe;
}
}
}
} else {
// All other events just get queued
if (LOG.isDebugEnabled()) {
LOG.debug("Queueing Non-Summary Recovery event of type " + eventType.name());
}
addToEventQueue(event);
}
}