private void createCriticalPath()

in tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java [435:616]


  private void createCriticalPath(DagInfo dagInfo, TaskAttemptInfo lastAttempt,
      long lastAttemptFinishTime, Map<String, TaskAttemptInfo> attempts) {
    List<CriticalPathStep> tempCP = Lists.newLinkedList();
    if (lastAttempt != null) {
      TaskAttemptInfo currentAttempt = lastAttempt;
      CriticalPathStep currentStep = new CriticalPathStep(currentAttempt, EntityType.DAG_COMMIT);
      long currentAttemptStopCriticalPathTime = lastAttemptFinishTime;

      // add the commit step
      if (dagInfo.getFinishTime() > 0) {
        currentStep.stopCriticalPathTime = dagInfo.getFinishTime();
      } else {
        // AM crashed and no dag finished written
        currentStep.stopCriticalPathTime = currentAttemptStopCriticalPathTime;
      }
      currentStep.startCriticalPathTime = currentAttemptStopCriticalPathTime;
      currentStep.reason = CriticalPathDependency.COMMIT_DEPENDENCY;
      tempCP.add(currentStep);

      while (true) {
        Preconditions.checkState(currentAttempt != null);
        Preconditions.checkState(currentAttemptStopCriticalPathTime > 0);
        LOG.debug("Step: " + tempCP.size() + " Attempt: " + currentAttempt.getTaskAttemptId());
        
        currentStep = new CriticalPathStep(currentAttempt, EntityType.ATTEMPT);
        currentStep.stopCriticalPathTime = currentAttemptStopCriticalPathTime;

        // consider the last data event seen immediately preceding the current critical path 
        // stop time for this attempt
        long currentStepLastDataEventTime = 0;
        String currentStepLastDataTA = null;
        DataDependencyEvent item = currentAttempt.getLastDataEventInfo(currentStep.stopCriticalPathTime);
        if (item!=null) {
          currentStepLastDataEventTime = item.getTimestamp();
          currentStepLastDataTA = item.getTaskAttemptId();
        }

        // sanity check
        for (CriticalPathStep previousStep : tempCP) {
          if (previousStep.type == EntityType.ATTEMPT) {
            if (previousStep.attempt.getTaskAttemptId().equals(currentAttempt.getTaskAttemptId())) {
              // found loop.
              // this should only happen for read errors in currentAttempt
              List<DataDependencyEvent> dataEvents = currentAttempt.getLastDataEvents();
              Preconditions.checkState(dataEvents.size() > 1); // received
                                                               // original and
                                                               // retry data events
              Preconditions.checkState(currentStepLastDataEventTime < dataEvents
                  .get(dataEvents.size() - 1).getTimestamp()); // new event is
                                                               // earlier than
                                                               // last
            }
          }
        }

        tempCP.add(currentStep);
  
        // find the next attempt on the critical path
        boolean dataDependency = false;
        // find out predecessor dependency
        if (currentStepLastDataEventTime > currentAttempt.getCreationTime()) {
          dataDependency = true;
        }
  
        long startCriticalPathTime = 0;
        String nextAttemptId = null;
        CriticalPathDependency reason = null;
        if (dataDependency) {
          // last data event was produced after the attempt was scheduled. use
          // data dependency
          // typically the case when scheduling ahead of time
          LOG.debug("Has data dependency");
          if (!Strings.isNullOrEmpty(currentStepLastDataTA)) {
            // there is a valid data causal TA. Use it.
            nextAttemptId = currentStepLastDataTA;
            reason = CriticalPathDependency.DATA_DEPENDENCY;
            startCriticalPathTime = currentStepLastDataEventTime;
            LOG.debug("Using data dependency " + nextAttemptId);
          } else {
            // there is no valid data causal TA. This means data event came from the same vertex
            VertexInfo vertex = currentAttempt.getTaskInfo().getVertexInfo();
            Preconditions.checkState(!vertex.getAdditionalInputInfoList().isEmpty(),
                "Vertex: " + vertex.getVertexId() + " has no external inputs but the last data event "
                    + "TA is null for " + currentAttempt.getTaskAttemptId());
            nextAttemptId = null;
            reason = CriticalPathDependency.INIT_DEPENDENCY;
            LOG.debug("Using init dependency");
          }
        } else {
          // attempt was scheduled after last data event. use scheduling dependency
          // typically happens for retries
          LOG.debug("Has scheduling dependency");
          if (!Strings.isNullOrEmpty(currentAttempt.getCreationCausalTA())) {
            // there is a scheduling causal TA. Use it.
            nextAttemptId = currentAttempt.getCreationCausalTA();
            reason = CriticalPathDependency.RETRY_DEPENDENCY;
            TaskAttemptInfo nextAttempt = attempts.get(nextAttemptId);
            if (nextAttemptId != null) {
              VertexInfo currentVertex = currentAttempt.getTaskInfo().getVertexInfo();
              VertexInfo nextVertex = nextAttempt.getTaskInfo().getVertexInfo();
              if (!nextVertex.getVertexName().equals(currentVertex.getVertexName())){
                // cause from different vertex. Might be rerun to re-generate outputs
                for (VertexInfo outVertex : currentVertex.getOutputVertices()) {
                  if (nextVertex.getVertexName().equals(outVertex.getVertexName())) {
                    // next vertex is an output vertex
                    reason = CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY;
                    break;
                  }
                }
              }
            }
            if (reason == CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY) {
              // rescheduled due to read error. start critical at read error report time.
              // for now proxy own creation time for read error report time
              startCriticalPathTime = currentAttempt.getCreationTime();
            } else {
              // rescheduled due to own previous attempt failure
              // we are critical when the previous attempt fails
              Preconditions.checkState(nextAttempt != null);
              Preconditions.checkState(nextAttempt.getTaskInfo().getTaskId().equals(
                  currentAttempt.getTaskInfo().getTaskId()));
              startCriticalPathTime = nextAttempt.getFinishTime();
            }
            LOG.debug("Using scheduling dependency " + nextAttemptId);
          } else {
            // there is no scheduling causal TA.
            if (!Strings.isNullOrEmpty(currentStepLastDataTA)) {
              // there is a data event going to the vertex. Count the time between data event and
              // creation time as Initializer/Manager overhead and follow data dependency
              nextAttemptId = currentStepLastDataTA;
              reason = CriticalPathDependency.DATA_DEPENDENCY;
              startCriticalPathTime = currentStepLastDataEventTime;
              long overhead = currentAttempt.getCreationTime() - currentStepLastDataEventTime;
              currentStep.notes
                  .add("Initializer/VertexManager scheduling overhead " + SVGUtils.getTimeStr(overhead));
              LOG.debug("Using data dependency " + nextAttemptId);
            } else {
              // there is no scheduling causal TA and no data event casual TA.
              // the vertex has external input that sent the last data events
              // or the vertex has external input but does not use events
              // or the vertex has no external inputs or edges
              nextAttemptId = null;
              reason = CriticalPathDependency.INIT_DEPENDENCY;
              LOG.debug("Using init dependency");
            }
          }
        }

        currentStep.startCriticalPathTime = startCriticalPathTime;
        currentStep.reason = reason;
        
        Preconditions.checkState(currentStep.stopCriticalPathTime >= currentStep.startCriticalPathTime);
  
        if (Strings.isNullOrEmpty(nextAttemptId)) {
          Preconditions.checkState(reason.equals(CriticalPathDependency.INIT_DEPENDENCY));
          Preconditions.checkState(startCriticalPathTime == 0);
          // no predecessor attempt found. this is the last step in the critical path
          // assume attempts start critical path time is when its scheduled. before that is 
          // vertex initialization time
          currentStep.startCriticalPathTime = currentStep.attempt.getCreationTime();
          
          // add vertex init step
          long initStepStopCriticalTime = currentStep.startCriticalPathTime;
          currentStep = new CriticalPathStep(currentAttempt, EntityType.VERTEX_INIT);
          currentStep.stopCriticalPathTime = initStepStopCriticalTime;
          currentStep.startCriticalPathTime = dagInfo.getStartTime();
          currentStep.reason = CriticalPathDependency.INIT_DEPENDENCY;
          tempCP.add(currentStep);
          
          if (!tempCP.isEmpty()) {
            for (int i=tempCP.size() - 1; i>=0; --i) {
              criticalPath.add(tempCP.get(i));
            }
          }
          return;
        }
  
        currentAttempt = attempts.get(nextAttemptId);
        currentAttemptStopCriticalPathTime = startCriticalPathTime;
      }
    }
  }