in tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java [177:285]
public void handle(DAGHistoryEvent event) throws IOException {
if (stopped.get()) {
LOG.warn("Igoring event as service stopped, eventType"
+ event.getHistoryEvent().getEventType());
return;
}
HistoryEventType eventType = event.getHistoryEvent().getEventType();
if (recoveryFatalErrorOccurred.get()) {
return;
}
if (!started.get()) {
LOG.warn("Adding event of type " + eventType
+ " to queue as service not started");
eventQueue.add(event);
return;
}
TezDAGID dagId = event.getDagID();
if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
DAGSubmittedEvent dagSubmittedEvent =
(DAGSubmittedEvent) event.getHistoryEvent();
String dagName = dagSubmittedEvent.getDAGName();
if (dagName != null
&& dagName.startsWith(
TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) {
// Skip recording pre-warm DAG events
skippedDAGs.add(dagId);
return;
}
}
if (dagId == null || skippedDAGs.contains(dagId)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping event for DAG"
+ ", eventType=" + eventType
+ ", dagId=" + (dagId == null ? "null" : dagId.toString())
+ ", isSkippedDAG=" + (dagId == null ? "null"
: skippedDAGs.contains(dagId)));
}
return;
}
if (event.getHistoryEvent() instanceof SummaryEvent) {
synchronized (lock) {
try {
SummaryEvent summaryEvent = (SummaryEvent) event.getHistoryEvent();
handleSummaryEvent(dagId, eventType, summaryEvent);
summaryStream.hsync();
if (summaryEvent.writeToRecoveryImmediately()) {
handleRecoveryEvent(event);
doFlush(outputStreamMap.get(event.getDagID()),
appContext.getClock().getTime(), true);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Queueing Non-immediate Summary/Recovery event of type"
+ eventType.name());
}
eventQueue.add(event);
}
if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
LOG.info("DAG completed"
+ ", dagId=" + event.getDagID()
+ ", queueSize=" + eventQueue.size());
completedDAGs.add(dagId);
if (outputStreamMap.containsKey(dagId)) {
try {
outputStreamMap.get(dagId).close();
outputStreamMap.remove(dagId);
} catch (IOException ioe) {
LOG.warn("Error when trying to flush/close recovery file for"
+ " dag, dagId=" + event.getDagID());
}
}
}
} catch (IOException ioe) {
LOG.error("Error handling summary event"
+ ", eventType=" + event.getHistoryEvent().getEventType(), ioe);
Path fatalErrorDir = new Path(recoveryPath, RECOVERY_FATAL_OCCURRED_DIR);
try {
LOG.error("Adding a flag to ensure next AM attempt does not start up"
+ ", flagFile=" + fatalErrorDir.toString());
recoveryFatalErrorOccurred.set(true);
recoveryDirFS.mkdirs(fatalErrorDir);
if (recoveryDirFS.exists(fatalErrorDir)) {
LOG.error("Recovery failure occurred. Skipping all events");
} else {
// throw error if fatal error flag could not be set
throw ioe;
}
} catch (IOException e) {
LOG.fatal("Failed to create fatal error flag dir "
+ fatalErrorDir.toString(), e);
throw ioe;
}
if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
// Throw error to tell client that dag submission failed
throw ioe;
}
}
}
} else {
// All other events just get queued
if (LOG.isDebugEnabled()) {
LOG.debug("Queueing Non-Summary Recovery event of type " + eventType.name());
}
eventQueue.add(event);
}
}