public RecoveredDAGData parseRecoveryData()

in tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java [511:910]


  public RecoveredDAGData parseRecoveryData() throws IOException {
    Path previousAttemptRecoveryDataDir = getPreviousAttemptRecoveryDataDir();
    LOG.info("Using " + previousAttemptRecoveryDataDir.toString()
        + " for recovering data from previous attempt");
    if (!recoveryFS.exists(previousAttemptRecoveryDataDir)) {
      LOG.info("Nothing to recover as previous attempt data does not exist"
          + ", previousAttemptDir=" + previousAttemptRecoveryDataDir.toString());
      createDataRecoveredFlagFile();
      return null;
    }

    Path summaryPath = getSummaryPath(previousAttemptRecoveryDataDir);
    FSDataInputStream summaryStream = getSummaryStream(
        summaryPath);
    if (summaryStream == null) {
      LOG.info("Nothing to recover as summary file does not exist"
          + ", previousAttemptDir=" + previousAttemptRecoveryDataDir.toString()
          + ", summaryPath=" + summaryPath.toString());
      createDataRecoveredFlagFile();
      return null;
    }

    Path newSummaryPath = getSummaryPath(currentAttemptRecoveryDataDir);
    FSDataOutputStream newSummaryStream =
        getSummaryOutputStream(newSummaryPath);

    FileStatus summaryFileStatus = recoveryFS.getFileStatus(summaryPath);
    LOG.info("Parsing summary file"
        + ", path=" + summaryPath.toString()
        + ", len=" + summaryFileStatus.getLen()
        + ", lastModTime=" + summaryFileStatus.getModificationTime());

    int dagCounter = 0;
    Map<TezDAGID, DAGSummaryData> dagSummaryDataMap =
        new HashMap<TezDAGID, DAGSummaryData>();
    while (true) {
      RecoveryProtos.SummaryEventProto proto;
      try {
        proto = RecoveryProtos.SummaryEventProto.parseDelimitedFrom(summaryStream);
        if (proto == null) {
          LOG.info("Reached end of summary stream");
          break;
        }
      } catch (EOFException eof) {
        LOG.info("Reached end of summary stream");
        break;
      }
      HistoryEventType eventType =
          HistoryEventType.values()[proto.getEventType()];
      if (LOG.isDebugEnabled()) {
        LOG.debug("[RECOVERY SUMMARY]"
            + " dagId=" + proto.getDagId()
            + ", timestamp=" + proto.getTimestamp()
            + ", event=" + eventType);
      }
      TezDAGID dagId = TezDAGID.fromString(proto.getDagId());
      if (dagCounter < dagId.getId()) {
        dagCounter = dagId.getId();
      }
      if (!dagSummaryDataMap.containsKey(dagId)) {
        dagSummaryDataMap.put(dagId, new DAGSummaryData(dagId));
      }
      dagSummaryDataMap.get(dagId).handleSummaryEvent(proto);
      proto.writeDelimitedTo(newSummaryStream);
    }
    summaryStream.close();
    newSummaryStream.hsync();
    newSummaryStream.close();

    // Set counter for next set of DAGs
    dagAppMaster.setDAGCounter(dagCounter);

    DAGSummaryData lastInProgressDAGData =
        getLastCompletedOrInProgressDAG(dagSummaryDataMap);
    if (lastInProgressDAGData == null) {
      LOG.info("Nothing to recover as no uncompleted/completed DAGs found");
      return null;
    }
    TezDAGID lastInProgressDAG = lastInProgressDAGData.dagId;
    if (lastInProgressDAG == null) {
      LOG.info("Nothing to recover as no uncompleted/completed DAGs found");
      return null;
    }

    LOG.info("Checking if DAG is in recoverable state"
        + ", dagId=" + lastInProgressDAGData.dagId);

    final RecoveredDAGData recoveredDAGData = new RecoveredDAGData();
    if (lastInProgressDAGData.completed) {
      recoveredDAGData.isCompleted = true;
      recoveredDAGData.dagState = lastInProgressDAGData.dagState;
    }

    String nonRecoverableReason = isDAGRecoverable(lastInProgressDAGData);
    if (nonRecoverableReason != null) {
      LOG.warn("Found last inProgress DAG but not recoverable: "
          + lastInProgressDAGData);
      recoveredDAGData.nonRecoverable = true;
      recoveredDAGData.reason = nonRecoverableReason;
    }

    LOG.info("Trying to recover dag from recovery file"
        + ", dagId=" + lastInProgressDAG.toString()
        + ", dataDir=" + previousAttemptRecoveryDataDir
        + ", intoCurrentDir=" + currentAttemptRecoveryDataDir);

    FSDataInputStream dagRecoveryStream = getDAGRecoveryStream(
        previousAttemptRecoveryDataDir, lastInProgressDAG);
    if (dagRecoveryStream == null) {
      // Could not find data to recover
      // Error out
      throw new IOException("Could not find recovery data for last in progress DAG"
          + ", dagId=" + lastInProgressDAG);
    }

    LOG.info("Copying DAG data into Current Attempt directory"
        + ", filePath=" + getDAGRecoveryFilePath(currentAttemptRecoveryDataDir,
        lastInProgressDAG));
    FSDataOutputStream newDAGRecoveryStream =
        getDAGRecoveryOutputStream(currentAttemptRecoveryDataDir, lastInProgressDAG);

    boolean skipAllOtherEvents = false;
    while (true) {
      HistoryEvent event;
      try {
        event = getNextEvent(dagRecoveryStream);
        if (event == null) {
          LOG.info("Reached end of dag recovery stream");
          break;
        }
      } catch (EOFException eof) {
        LOG.info("Reached end of dag recovery stream");
        break;
      } catch (IOException ioe) {
        LOG.warn("Corrupt data found when trying to read next event", ioe);
        break;
      }
      if (event == null || skipAllOtherEvents) {
        // reached end of data
        break;
      }
      HistoryEventType eventType = event.getEventType();
      switch (eventType) {
        case DAG_SUBMITTED:
        {
          DAGSubmittedEvent submittedEvent = (DAGSubmittedEvent) event;
          LOG.info("Recovering from event"
              + ", eventType=" + eventType
              + ", event=" + event.toString());
          recoveredDAGData.recoveredDAG = dagAppMaster.createDAG(submittedEvent.getDAGPlan(),
              lastInProgressDAG);
          recoveredDAGData.cumulativeAdditionalResources = submittedEvent
            .getCumulativeAdditionalLocalResources();
          recoveredDAGData.recoveredDagID = recoveredDAGData.recoveredDAG.getID();
          dagAppMaster.setCurrentDAG(recoveredDAGData.recoveredDAG);
          if (recoveredDAGData.nonRecoverable) {
            skipAllOtherEvents = true;
          }
          break;
        }
        case DAG_INITIALIZED:
        {
          LOG.info("Recovering from event"
              + ", eventType=" + eventType
              + ", event=" + event.toString());
          assert recoveredDAGData.recoveredDAG != null;
          recoveredDAGData.recoveredDAG.restoreFromEvent(event);
          break;
        }
        case DAG_STARTED:
        {
          LOG.info("Recovering from event"
              + ", eventType=" + eventType
              + ", event=" + event.toString());
          assert recoveredDAGData.recoveredDAG != null;
          recoveredDAGData.recoveredDAG.restoreFromEvent(event);
          break;
        }
        case DAG_COMMIT_STARTED:
        {
          LOG.info("Recovering from event"
              + ", eventType=" + eventType
              + ", event=" + event.toString());
          assert recoveredDAGData.recoveredDAG != null;
          recoveredDAGData.recoveredDAG.restoreFromEvent(event);
          break;
        }
        case VERTEX_GROUP_COMMIT_STARTED:
        {
          LOG.info("Recovering from event"
              + ", eventType=" + eventType
              + ", event=" + event.toString());
          assert recoveredDAGData.recoveredDAG != null;
          recoveredDAGData.recoveredDAG.restoreFromEvent(event);
          break;
        }
        case VERTEX_GROUP_COMMIT_FINISHED:
        {
          LOG.info("Recovering from event"
              + ", eventType=" + eventType
              + ", event=" + event.toString());
          assert recoveredDAGData.recoveredDAG != null;
          recoveredDAGData.recoveredDAG.restoreFromEvent(event);
          break;
        }
        case DAG_FINISHED:
        {
          LOG.info("Recovering from event"
              + ", eventType=" + eventType
              + ", event=" + event.toString());
          // If this is seen, nothing to recover
          assert recoveredDAGData.recoveredDAG != null;
          recoveredDAGData.recoveredDAG.restoreFromEvent(event);
          recoveredDAGData.isCompleted = true;
          recoveredDAGData.dagState =
              ((DAGFinishedEvent) event).getState();
          skipAllOtherEvents = true;
        }
        case CONTAINER_LAUNCHED:
        {
          // Nothing to do for now
          break;
        }
        case VERTEX_INITIALIZED:
        {
          LOG.info("Recovering from event"
              + ", eventType=" + eventType
              + ", event=" + event.toString());
          assert recoveredDAGData.recoveredDAG != null;
          VertexInitializedEvent vEvent = (VertexInitializedEvent) event;
          Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
          v.restoreFromEvent(vEvent);
          break;
        }
        case VERTEX_STARTED:
        {
          LOG.info("Recovering from event"
              + ", eventType=" + eventType
              + ", event=" + event.toString());
          assert recoveredDAGData.recoveredDAG != null;
          VertexStartedEvent vEvent = (VertexStartedEvent) event;
          Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
          v.restoreFromEvent(vEvent);
          break;
        }
        case VERTEX_PARALLELISM_UPDATED:
        {
          LOG.info("Recovering from event"
              + ", eventType=" + eventType
              + ", event=" + event.toString());
          assert recoveredDAGData.recoveredDAG != null;
          VertexParallelismUpdatedEvent vEvent = (VertexParallelismUpdatedEvent) event;
          Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
          v.restoreFromEvent(vEvent);
          break;
        }
        case VERTEX_COMMIT_STARTED:
        {
          LOG.info("Recovering from event"
              + ", eventType=" + eventType
              + ", event=" + event.toString());
          assert recoveredDAGData.recoveredDAG != null;
          VertexCommitStartedEvent vEvent = (VertexCommitStartedEvent) event;
          Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
          v.restoreFromEvent(vEvent);
          break;
        }
        case VERTEX_FINISHED:
        {
          LOG.info("Recovering from event"
              + ", eventType=" + eventType
              + ", event=" + event.toString());
          assert recoveredDAGData.recoveredDAG != null;
          VertexFinishedEvent vEvent = (VertexFinishedEvent) event;
          Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
          v.restoreFromEvent(vEvent);
          break;
        }
        case TASK_STARTED:
        {
          LOG.info("Recovering from event"
              + ", eventType=" + eventType
              + ", event=" + event.toString());
          assert recoveredDAGData.recoveredDAG != null;
          TaskStartedEvent tEvent = (TaskStartedEvent) event;
          Task task = recoveredDAGData.recoveredDAG.getVertex(
              tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID());
          task.restoreFromEvent(tEvent);
          break;
        }
        case TASK_FINISHED:
        {
          LOG.info("Recovering from event"
              + ", eventType=" + eventType
              + ", event=" + event.toString());
          assert recoveredDAGData.recoveredDAG != null;
          TaskFinishedEvent tEvent = (TaskFinishedEvent) event;
          Task task = recoveredDAGData.recoveredDAG.getVertex(
              tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID());
          task.restoreFromEvent(tEvent);
          break;
        }
        case TASK_ATTEMPT_STARTED:
        {
          LOG.info("Recovering from event"
              + ", eventType=" + eventType
              + ", event=" + event.toString());
          assert recoveredDAGData.recoveredDAG != null;
          TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) event;
          Task task =
              recoveredDAGData.recoveredDAG.getVertex(
                  tEvent.getTaskAttemptID().getTaskID().getVertexID())
                      .getTask(tEvent.getTaskAttemptID().getTaskID());
          task.restoreFromEvent(tEvent);
          break;
        }
        case TASK_ATTEMPT_FINISHED:
        {
          LOG.info("Recovering from event"
              + ", eventType=" + eventType
              + ", event=" + event.toString());
          assert recoveredDAGData.recoveredDAG != null;
          TaskAttemptFinishedEvent tEvent = (TaskAttemptFinishedEvent) event;
          Task task =
              recoveredDAGData.recoveredDAG.getVertex(
                  tEvent.getTaskAttemptID().getTaskID().getVertexID())
                  .getTask(tEvent.getTaskAttemptID().getTaskID());
          task.restoreFromEvent(tEvent);
          break;
        }
        case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
        {
          LOG.info("Recovering from event"
              + ", eventType=" + eventType
              + ", event=" + event.toString());
          assert recoveredDAGData.recoveredDAG != null;
          VertexDataMovementEventsGeneratedEvent vEvent =
              (VertexDataMovementEventsGeneratedEvent) event;
          Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
          v.restoreFromEvent(vEvent);
          break;
        }
        default:
          throw new RuntimeException("Invalid data found, unknown event type "
              + eventType);
      }
      if (LOG.isDebugEnabled()) {
        LOG.debug("[DAG RECOVERY]"
            + " dagId=" + lastInProgressDAG
            + ", eventType=" + eventType
            + ", event=" + event.toString());
      }
      newDAGRecoveryStream.writeInt(eventType.ordinal());
      event.toProtoStream(newDAGRecoveryStream);
    }
    dagRecoveryStream.close();
    newDAGRecoveryStream.hsync();
    newDAGRecoveryStream.close();

    if (!recoveredDAGData.isCompleted
        && !recoveredDAGData.nonRecoverable) {
      if (lastInProgressDAGData.bufferedSummaryEvents != null
        && !lastInProgressDAGData.bufferedSummaryEvents.isEmpty()) {
        for (HistoryEvent bufferedEvent : lastInProgressDAGData.bufferedSummaryEvents) {
          assert recoveredDAGData.recoveredDAG != null;
          switch (bufferedEvent.getEventType()) {
            case VERTEX_GROUP_COMMIT_STARTED:
              recoveredDAGData.recoveredDAG.restoreFromEvent(bufferedEvent);
              break;
            case VERTEX_GROUP_COMMIT_FINISHED:
              recoveredDAGData.recoveredDAG.restoreFromEvent(bufferedEvent);
              break;
            case VERTEX_FINISHED:
              VertexFinishedEvent vertexFinishedEvent =
                  (VertexFinishedEvent) bufferedEvent;
              Vertex vertex = recoveredDAGData.recoveredDAG.getVertex(
                  vertexFinishedEvent.getVertexID());
              if (vertex == null) {
                recoveredDAGData.nonRecoverable = true;
                recoveredDAGData.reason = "All state could not be recovered"
                    + ", vertex completed but events not flushed"
                    + ", vertexId=" + vertexFinishedEvent.getVertexID();
              } else {
                vertex.restoreFromEvent(vertexFinishedEvent);
              }
              break;
            default:
              throw new RuntimeException("Invalid data found in buffered summary events"
                  + ", unknown event type "
                  + bufferedEvent.getEventType());
          }
        }
      }
    }

    LOG.info("Finished copying data from previous attempt into current attempt");
    createDataRecoveredFlagFile();

    return recoveredDAGData;
  }