in tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java [511:910]
public RecoveredDAGData parseRecoveryData() throws IOException {
Path previousAttemptRecoveryDataDir = getPreviousAttemptRecoveryDataDir();
LOG.info("Using " + previousAttemptRecoveryDataDir.toString()
+ " for recovering data from previous attempt");
if (!recoveryFS.exists(previousAttemptRecoveryDataDir)) {
LOG.info("Nothing to recover as previous attempt data does not exist"
+ ", previousAttemptDir=" + previousAttemptRecoveryDataDir.toString());
createDataRecoveredFlagFile();
return null;
}
Path summaryPath = getSummaryPath(previousAttemptRecoveryDataDir);
FSDataInputStream summaryStream = getSummaryStream(
summaryPath);
if (summaryStream == null) {
LOG.info("Nothing to recover as summary file does not exist"
+ ", previousAttemptDir=" + previousAttemptRecoveryDataDir.toString()
+ ", summaryPath=" + summaryPath.toString());
createDataRecoveredFlagFile();
return null;
}
Path newSummaryPath = getSummaryPath(currentAttemptRecoveryDataDir);
FSDataOutputStream newSummaryStream =
getSummaryOutputStream(newSummaryPath);
FileStatus summaryFileStatus = recoveryFS.getFileStatus(summaryPath);
LOG.info("Parsing summary file"
+ ", path=" + summaryPath.toString()
+ ", len=" + summaryFileStatus.getLen()
+ ", lastModTime=" + summaryFileStatus.getModificationTime());
int dagCounter = 0;
Map<TezDAGID, DAGSummaryData> dagSummaryDataMap =
new HashMap<TezDAGID, DAGSummaryData>();
while (true) {
RecoveryProtos.SummaryEventProto proto;
try {
proto = RecoveryProtos.SummaryEventProto.parseDelimitedFrom(summaryStream);
if (proto == null) {
LOG.info("Reached end of summary stream");
break;
}
} catch (EOFException eof) {
LOG.info("Reached end of summary stream");
break;
}
HistoryEventType eventType =
HistoryEventType.values()[proto.getEventType()];
if (LOG.isDebugEnabled()) {
LOG.debug("[RECOVERY SUMMARY]"
+ " dagId=" + proto.getDagId()
+ ", timestamp=" + proto.getTimestamp()
+ ", event=" + eventType);
}
TezDAGID dagId = TezDAGID.fromString(proto.getDagId());
if (dagCounter < dagId.getId()) {
dagCounter = dagId.getId();
}
if (!dagSummaryDataMap.containsKey(dagId)) {
dagSummaryDataMap.put(dagId, new DAGSummaryData(dagId));
}
dagSummaryDataMap.get(dagId).handleSummaryEvent(proto);
proto.writeDelimitedTo(newSummaryStream);
}
summaryStream.close();
newSummaryStream.hsync();
newSummaryStream.close();
// Set counter for next set of DAGs
dagAppMaster.setDAGCounter(dagCounter);
DAGSummaryData lastInProgressDAGData =
getLastCompletedOrInProgressDAG(dagSummaryDataMap);
if (lastInProgressDAGData == null) {
LOG.info("Nothing to recover as no uncompleted/completed DAGs found");
return null;
}
TezDAGID lastInProgressDAG = lastInProgressDAGData.dagId;
if (lastInProgressDAG == null) {
LOG.info("Nothing to recover as no uncompleted/completed DAGs found");
return null;
}
LOG.info("Checking if DAG is in recoverable state"
+ ", dagId=" + lastInProgressDAGData.dagId);
final RecoveredDAGData recoveredDAGData = new RecoveredDAGData();
if (lastInProgressDAGData.completed) {
recoveredDAGData.isCompleted = true;
recoveredDAGData.dagState = lastInProgressDAGData.dagState;
}
String nonRecoverableReason = isDAGRecoverable(lastInProgressDAGData);
if (nonRecoverableReason != null) {
LOG.warn("Found last inProgress DAG but not recoverable: "
+ lastInProgressDAGData);
recoveredDAGData.nonRecoverable = true;
recoveredDAGData.reason = nonRecoverableReason;
}
LOG.info("Trying to recover dag from recovery file"
+ ", dagId=" + lastInProgressDAG.toString()
+ ", dataDir=" + previousAttemptRecoveryDataDir
+ ", intoCurrentDir=" + currentAttemptRecoveryDataDir);
FSDataInputStream dagRecoveryStream = getDAGRecoveryStream(
previousAttemptRecoveryDataDir, lastInProgressDAG);
if (dagRecoveryStream == null) {
// Could not find data to recover
// Error out
throw new IOException("Could not find recovery data for last in progress DAG"
+ ", dagId=" + lastInProgressDAG);
}
LOG.info("Copying DAG data into Current Attempt directory"
+ ", filePath=" + getDAGRecoveryFilePath(currentAttemptRecoveryDataDir,
lastInProgressDAG));
FSDataOutputStream newDAGRecoveryStream =
getDAGRecoveryOutputStream(currentAttemptRecoveryDataDir, lastInProgressDAG);
boolean skipAllOtherEvents = false;
while (true) {
HistoryEvent event;
try {
event = getNextEvent(dagRecoveryStream);
if (event == null) {
LOG.info("Reached end of dag recovery stream");
break;
}
} catch (EOFException eof) {
LOG.info("Reached end of dag recovery stream");
break;
} catch (IOException ioe) {
LOG.warn("Corrupt data found when trying to read next event", ioe);
break;
}
if (event == null || skipAllOtherEvents) {
// reached end of data
break;
}
HistoryEventType eventType = event.getEventType();
switch (eventType) {
case DAG_SUBMITTED:
{
DAGSubmittedEvent submittedEvent = (DAGSubmittedEvent) event;
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
recoveredDAGData.recoveredDAG = dagAppMaster.createDAG(submittedEvent.getDAGPlan(),
lastInProgressDAG);
recoveredDAGData.cumulativeAdditionalResources = submittedEvent
.getCumulativeAdditionalLocalResources();
recoveredDAGData.recoveredDagID = recoveredDAGData.recoveredDAG.getID();
dagAppMaster.setCurrentDAG(recoveredDAGData.recoveredDAG);
if (recoveredDAGData.nonRecoverable) {
skipAllOtherEvents = true;
}
break;
}
case DAG_INITIALIZED:
{
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
assert recoveredDAGData.recoveredDAG != null;
recoveredDAGData.recoveredDAG.restoreFromEvent(event);
break;
}
case DAG_STARTED:
{
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
assert recoveredDAGData.recoveredDAG != null;
recoveredDAGData.recoveredDAG.restoreFromEvent(event);
break;
}
case DAG_COMMIT_STARTED:
{
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
assert recoveredDAGData.recoveredDAG != null;
recoveredDAGData.recoveredDAG.restoreFromEvent(event);
break;
}
case VERTEX_GROUP_COMMIT_STARTED:
{
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
assert recoveredDAGData.recoveredDAG != null;
recoveredDAGData.recoveredDAG.restoreFromEvent(event);
break;
}
case VERTEX_GROUP_COMMIT_FINISHED:
{
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
assert recoveredDAGData.recoveredDAG != null;
recoveredDAGData.recoveredDAG.restoreFromEvent(event);
break;
}
case DAG_FINISHED:
{
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
// If this is seen, nothing to recover
assert recoveredDAGData.recoveredDAG != null;
recoveredDAGData.recoveredDAG.restoreFromEvent(event);
recoveredDAGData.isCompleted = true;
recoveredDAGData.dagState =
((DAGFinishedEvent) event).getState();
skipAllOtherEvents = true;
}
case CONTAINER_LAUNCHED:
{
// Nothing to do for now
break;
}
case VERTEX_INITIALIZED:
{
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
assert recoveredDAGData.recoveredDAG != null;
VertexInitializedEvent vEvent = (VertexInitializedEvent) event;
Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
v.restoreFromEvent(vEvent);
break;
}
case VERTEX_STARTED:
{
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
assert recoveredDAGData.recoveredDAG != null;
VertexStartedEvent vEvent = (VertexStartedEvent) event;
Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
v.restoreFromEvent(vEvent);
break;
}
case VERTEX_PARALLELISM_UPDATED:
{
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
assert recoveredDAGData.recoveredDAG != null;
VertexParallelismUpdatedEvent vEvent = (VertexParallelismUpdatedEvent) event;
Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
v.restoreFromEvent(vEvent);
break;
}
case VERTEX_COMMIT_STARTED:
{
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
assert recoveredDAGData.recoveredDAG != null;
VertexCommitStartedEvent vEvent = (VertexCommitStartedEvent) event;
Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
v.restoreFromEvent(vEvent);
break;
}
case VERTEX_FINISHED:
{
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
assert recoveredDAGData.recoveredDAG != null;
VertexFinishedEvent vEvent = (VertexFinishedEvent) event;
Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
v.restoreFromEvent(vEvent);
break;
}
case TASK_STARTED:
{
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
assert recoveredDAGData.recoveredDAG != null;
TaskStartedEvent tEvent = (TaskStartedEvent) event;
Task task = recoveredDAGData.recoveredDAG.getVertex(
tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID());
task.restoreFromEvent(tEvent);
break;
}
case TASK_FINISHED:
{
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
assert recoveredDAGData.recoveredDAG != null;
TaskFinishedEvent tEvent = (TaskFinishedEvent) event;
Task task = recoveredDAGData.recoveredDAG.getVertex(
tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID());
task.restoreFromEvent(tEvent);
break;
}
case TASK_ATTEMPT_STARTED:
{
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
assert recoveredDAGData.recoveredDAG != null;
TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) event;
Task task =
recoveredDAGData.recoveredDAG.getVertex(
tEvent.getTaskAttemptID().getTaskID().getVertexID())
.getTask(tEvent.getTaskAttemptID().getTaskID());
task.restoreFromEvent(tEvent);
break;
}
case TASK_ATTEMPT_FINISHED:
{
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
assert recoveredDAGData.recoveredDAG != null;
TaskAttemptFinishedEvent tEvent = (TaskAttemptFinishedEvent) event;
Task task =
recoveredDAGData.recoveredDAG.getVertex(
tEvent.getTaskAttemptID().getTaskID().getVertexID())
.getTask(tEvent.getTaskAttemptID().getTaskID());
task.restoreFromEvent(tEvent);
break;
}
case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
{
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
assert recoveredDAGData.recoveredDAG != null;
VertexDataMovementEventsGeneratedEvent vEvent =
(VertexDataMovementEventsGeneratedEvent) event;
Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
v.restoreFromEvent(vEvent);
break;
}
default:
throw new RuntimeException("Invalid data found, unknown event type "
+ eventType);
}
if (LOG.isDebugEnabled()) {
LOG.debug("[DAG RECOVERY]"
+ " dagId=" + lastInProgressDAG
+ ", eventType=" + eventType
+ ", event=" + event.toString());
}
newDAGRecoveryStream.writeInt(eventType.ordinal());
event.toProtoStream(newDAGRecoveryStream);
}
dagRecoveryStream.close();
newDAGRecoveryStream.hsync();
newDAGRecoveryStream.close();
if (!recoveredDAGData.isCompleted
&& !recoveredDAGData.nonRecoverable) {
if (lastInProgressDAGData.bufferedSummaryEvents != null
&& !lastInProgressDAGData.bufferedSummaryEvents.isEmpty()) {
for (HistoryEvent bufferedEvent : lastInProgressDAGData.bufferedSummaryEvents) {
assert recoveredDAGData.recoveredDAG != null;
switch (bufferedEvent.getEventType()) {
case VERTEX_GROUP_COMMIT_STARTED:
recoveredDAGData.recoveredDAG.restoreFromEvent(bufferedEvent);
break;
case VERTEX_GROUP_COMMIT_FINISHED:
recoveredDAGData.recoveredDAG.restoreFromEvent(bufferedEvent);
break;
case VERTEX_FINISHED:
VertexFinishedEvent vertexFinishedEvent =
(VertexFinishedEvent) bufferedEvent;
Vertex vertex = recoveredDAGData.recoveredDAG.getVertex(
vertexFinishedEvent.getVertexID());
if (vertex == null) {
recoveredDAGData.nonRecoverable = true;
recoveredDAGData.reason = "All state could not be recovered"
+ ", vertex completed but events not flushed"
+ ", vertexId=" + vertexFinishedEvent.getVertexID();
} else {
vertex.restoreFromEvent(vertexFinishedEvent);
}
break;
default:
throw new RuntimeException("Invalid data found in buffered summary events"
+ ", unknown event type "
+ bufferedEvent.getEventType());
}
}
}
}
LOG.info("Finished copying data from previous attempt into current attempt");
createDataRecoveredFlagFile();
return recoveredDAGData;
}