public DAGRecoveryData parseRecoveryData()

in tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java [657:915]


  public DAGRecoveryData parseRecoveryData() throws IOException {
    int dagCounter = 0;
    Map<TezDAGID, DAGSummaryData> dagSummaryDataMap =
        new HashMap<TezDAGID, DAGSummaryData>();
    List<Path> summaryFiles = getSummaryFiles();
    LOG.debug("SummaryFile size:" + summaryFiles.size());
    for (Path summaryFile : summaryFiles) {
      FileStatus summaryFileStatus = recoveryFS.getFileStatus(summaryFile);
      LOG.info("Parsing summary file"
          + ", path=" + summaryFile.toString()
          + ", len=" + summaryFileStatus.getLen()
          + ", lastModTime=" + summaryFileStatus.getModificationTime());
      FSDataInputStream summaryStream = getSummaryStream(
          summaryFile, summaryFileStatus);
      while (true) {
        RecoveryProtos.SummaryEventProto proto;
        try {
          proto = RecoveryProtos.SummaryEventProto.parseDelimitedFrom(summaryStream);
          if (proto == null) {
            LOG.info("Reached end of summary stream");
            break;
          }
        } catch (EOFException eof) {
          LOG.info("Reached end of summary stream");
          break;
        }
        HistoryEventType eventType =
            HistoryEventType.values()[proto.getEventType()];
        if (LOG.isDebugEnabled()) {
          LOG.debug("[RECOVERY SUMMARY]"
              + " dagId=" + proto.getDagId()
              + ", timestamp=" + proto.getTimestamp()
              + ", event=" + eventType);
        }
        TezDAGID dagId;
        try {
          dagId = TezDAGID.fromString(proto.getDagId());
        } catch (IllegalArgumentException e) {
          throw new IOException("Invalid dagId, summary records may be corrupted", e);
        }
        if (dagCounter < dagId.getId()) {
          dagCounter = dagId.getId();
        }
        if (!dagSummaryDataMap.containsKey(dagId)) {
          dagSummaryDataMap.put(dagId, new DAGSummaryData(dagId));
        }
        try {
          dagSummaryDataMap.get(dagId).handleSummaryEvent(proto);
        } catch (Exception e) {
          // any exception when parsing protobuf
          throw new IOException("Error when parsing summary event proto", e);
        }
      }
      summaryStream.close();
    }

    // Set counter for next set of DAGs & update dagNames Set in DAGAppMaster
    dagAppMaster.setDAGCounter(dagCounter);
    for (DAGSummaryData dagSummaryData: dagSummaryDataMap.values()){
      dagAppMaster.dagIDs.add(dagSummaryData.dagId.toString());
    }

    DAGSummaryData lastInProgressDAGData =
        getLastCompletedOrInProgressDAG(dagSummaryDataMap);
    if (lastInProgressDAGData == null) {
      LOG.info("Nothing to recover as no uncompleted/completed DAGs found");
      return null;
    }
    TezDAGID lastInProgressDAG = lastInProgressDAGData.dagId;
    if (lastInProgressDAG == null) {
      LOG.info("Nothing to recover as no uncompleted/completed DAGs found");
      return null;
    }

    LOG.info("Checking if DAG is in recoverable state"
        + ", dagId=" + lastInProgressDAGData.dagId);

    final DAGRecoveryData recoveredDAGData = new DAGRecoveryData(lastInProgressDAGData);
    List<Path> dagRecoveryFiles = getDAGRecoveryFiles(lastInProgressDAG);
    boolean skipAllOtherEvents = false;
    Path lastRecoveryFile = null;
    // read the non summary events even when it is nonrecoverable. (Just read the DAGSubmittedEvent
    // to create the DAGImpl)
    for (Path dagRecoveryFile : dagRecoveryFiles) {
      if (skipAllOtherEvents) {
        LOG.warn("Other recovery files will be skipped due to error in the previous recovery file"
            + lastRecoveryFile);
        break;
      }
      lastRecoveryFile = dagRecoveryFile;
      LOG.info("Trying to recover dag from recovery file, dagId={}, dagRecoveryFile={}", lastInProgressDAG,
          dagRecoveryFile);
      if (LOG.isDebugEnabled()) {
        FileStatus fileStatus = recoveryFS.getFileStatus(dagRecoveryFile);
        LOG.debug("Recovery file details: {}", fileStatus);
      }

      FSDataInputStream dagRecoveryStream = recoveryFS.open(dagRecoveryFile, recoveryBufferSize);
      CodedInputStream codedInputStream = CodedInputStream.newInstance(dagRecoveryStream);
      codedInputStream.setSizeLimit(Integer.MAX_VALUE);
      while (true) {
        HistoryEvent event;
        try {
          event = getNextEvent(codedInputStream);
          if (event == null) {
            LOG.info("Reached end of dag recovery stream");
            break;
          }
        } catch (EOFException eof) {
          LOG.info("Reached end of dag recovery stream");
          break;
        } catch (IOException ioe) {
          LOG.warn("Corrupt data found when trying to read next event", ioe);
          break;
        }
        if (skipAllOtherEvents) {
          // hit an error - skip reading other events
          break;
        }

        HistoryEventType eventType = event.getEventType();
        LOG.info("Recovering from event"
            + ", eventType=" + eventType
            + ", event=" + event.toString());
        switch (eventType) {
          case DAG_SUBMITTED:
            DAGSubmittedEvent submittedEvent = (DAGSubmittedEvent) event;
            recoveredDAGData.recoveredDAG = dagAppMaster.createDAG(submittedEvent.getDAGPlan(),
                lastInProgressDAG);
            recoveredDAGData.cumulativeAdditionalResources = submittedEvent
              .getCumulativeAdditionalLocalResources();
            recoveredDAGData.recoveredDagID = recoveredDAGData.recoveredDAG.getID();
            dagAppMaster.setCurrentDAG(recoveredDAGData.recoveredDAG);
            if (recoveredDAGData.nonRecoverable) {
              skipAllOtherEvents = true;
            }
            break;
          case DAG_INITIALIZED:
            recoveredDAGData.dagInitedEvent = (DAGInitializedEvent)event;
            break;
          case DAG_STARTED:
            recoveredDAGData.dagStartedEvent= (DAGStartedEvent)event;
            break;
          case DAG_FINISHED:
            recoveredDAGData.dagFinishedEvent = (DAGFinishedEvent)event;
            skipAllOtherEvents = true;
            break;
          case DAG_COMMIT_STARTED:
          case VERTEX_GROUP_COMMIT_STARTED:
          case VERTEX_GROUP_COMMIT_FINISHED:
          case CONTAINER_LAUNCHED:
          {
            // Nothing to do for now
            break;
          }
          case DAG_KILL_REQUEST:
          {
            break;
          }
          case VERTEX_INITIALIZED:

          {
            VertexInitializedEvent vertexInitEvent = (VertexInitializedEvent)event;
            VertexRecoveryData vertexRecoveryData = recoveredDAGData.maybeCreateVertexRecoveryData(vertexInitEvent.getVertexID());
            vertexRecoveryData.vertexInitedEvent = vertexInitEvent;
            break;
          }
          case VERTEX_CONFIGURE_DONE:
          {
            VertexConfigurationDoneEvent reconfigureDoneEvent = (VertexConfigurationDoneEvent)event;
            VertexRecoveryData vertexRecoveryData = recoveredDAGData.maybeCreateVertexRecoveryData(reconfigureDoneEvent.getVertexID());
            vertexRecoveryData.vertexConfigurationDoneEvent = reconfigureDoneEvent;
            break;
          }
          case VERTEX_STARTED:
          {
            VertexStartedEvent vertexStartedEvent = (VertexStartedEvent)event;
            VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(vertexStartedEvent.getVertexID());
            Preconditions.checkArgument(vertexRecoveryData != null, "No VertexInitializedEvent before VertexStartedEvent");
            vertexRecoveryData.vertexStartedEvent = vertexStartedEvent;
            break;
          }
          case VERTEX_COMMIT_STARTED:
          {
            break;
          }
          case VERTEX_FINISHED:
          {
            VertexFinishedEvent vertexFinishedEvent = (VertexFinishedEvent)event;
            VertexRecoveryData vertexRecoveryData = recoveredDAGData.maybeCreateVertexRecoveryData(vertexFinishedEvent.getVertexID());
            vertexRecoveryData.vertexFinishedEvent = vertexFinishedEvent;
            break;
          }
          case TASK_STARTED:
          {
            TaskStartedEvent taskStartedEvent = (TaskStartedEvent) event;
            VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(taskStartedEvent.getVertexID());
            Preconditions.checkArgument(vertexRecoveryData != null,
                "Invalid TaskStartedEvent, its vertex does not exist:" + taskStartedEvent.getVertexID());
            TaskRecoveryData taskRecoveryData = vertexRecoveryData.maybeCreateTaskRecoveryData(taskStartedEvent.getTaskID());
            taskRecoveryData.taskStartedEvent = taskStartedEvent;
            break;
          }
          case TASK_FINISHED:
          {
            TaskFinishedEvent taskFinishedEvent = (TaskFinishedEvent) event;
            VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(taskFinishedEvent.getVertexID());
            Preconditions.checkArgument(vertexRecoveryData != null,
                "Invalid TaskFinishedEvent, its vertex does not exist:" + taskFinishedEvent.getVertexID());
            TaskRecoveryData taskRecoveryData = vertexRecoveryData.maybeCreateTaskRecoveryData(taskFinishedEvent.getTaskID());
            taskRecoveryData.taskFinishedEvent = taskFinishedEvent;
            break;
          }
          case TASK_ATTEMPT_STARTED:
          {
            TaskAttemptStartedEvent taStartedEvent = (TaskAttemptStartedEvent)event;
            VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(
                taStartedEvent.getVertexID());
            Preconditions.checkArgument(vertexRecoveryData != null,
                "Invalid TaskAttemptStartedEvent, its vertexId does not exist, taId=" + taStartedEvent.getTaskAttemptID());
            TaskRecoveryData taskRecoveryData = vertexRecoveryData.taskRecoveryDataMap
                .get(taStartedEvent.getTaskAttemptID().getTaskID());
            Preconditions.checkArgument(taskRecoveryData != null,
                "Invalid TaskAttemptStartedEvent, its taskId does not exist, taId=" + taStartedEvent.getTaskAttemptID());
            TaskAttemptRecoveryData taRecoveryData = taskRecoveryData.maybeCreateTaskAttemptRecoveryData(taStartedEvent.getTaskAttemptID());
            taRecoveryData.taStartedEvent = taStartedEvent;
            break;
          }
          case TASK_ATTEMPT_FINISHED:
          {
            TaskAttemptFinishedEvent taFinishedEvent = (TaskAttemptFinishedEvent)event;
            VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(
                taFinishedEvent.getVertexID());
            Preconditions.checkArgument(vertexRecoveryData != null,
                "Invalid TaskAttemtFinishedEvent, its vertexId does not exist, taId=" + taFinishedEvent.getTaskAttemptID());
            TaskRecoveryData taskRecoveryData = vertexRecoveryData.taskRecoveryDataMap
                .get(taFinishedEvent.getTaskAttemptID().getTaskID());
            Preconditions.checkArgument(taskRecoveryData != null,
                "Invalid TaskAttemptFinishedEvent, its taskId does not exist, taId=" + taFinishedEvent.getTaskAttemptID());
            TaskAttemptRecoveryData taRecoveryData = taskRecoveryData.maybeCreateTaskAttemptRecoveryData(taFinishedEvent.getTaskAttemptID());
            taRecoveryData.taFinishedEvent = taFinishedEvent;
            break;
          }
          default:
            throw new RuntimeException("Invalid data found, unknown event type "
                + eventType);
        }
        if (LOG.isDebugEnabled()) {
          LOG.debug("[DAG RECOVERY]"
              + " dagId=" + lastInProgressDAG
              + ", eventType=" + eventType
              + ", event=" + event.toString());
        }
      }
      dagRecoveryStream.close();
    }
    recoveredDAGData.checkRecoverableNonSummary();
    return recoveredDAGData;
  }