private void analyzeAllocationOverhead()

in tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java [259:356]


  private void analyzeAllocationOverhead(DagInfo dag) {
    List<TaskAttemptInfo> preemptedAttempts = Lists.newArrayList();
    for (VertexInfo v : dag.getVertices()) {
      for (TaskInfo t : v.getTasks()) {
        for (TaskAttemptInfo a : t.getTaskAttempts()) {
          if (a.getTerminationCause().equals(
              TaskAttemptTerminationCause.INTERNAL_PREEMPTION.name())) {
            LOG.debug("Found preempted attempt " + a.getTaskAttemptId());
            preemptedAttempts.add(a);
          }
        }
      }
    }
    for (int i = 0; i < criticalPath.size(); ++i) {
      CriticalPathStep step = criticalPath.get(i);
      TaskAttemptInfo attempt = step.attempt;
      if (step.getType() != EntityType.ATTEMPT) {
        continue;
      }
      
      long creationTime = attempt.getCreationTime();
      long allocationTime = attempt.getAllocationTime();
      long finishTime = attempt.getFinishTime();
      if (allocationTime < step.startCriticalPathTime) {
        // allocated before it became critical
        continue;
      }

      // the attempt is critical before allocation. So allocation overhead needs analysis
      Container container = attempt.getContainer();
      if (container != null) {
        Collection<TaskAttemptInfo> attempts = dag.getContainerMapping().get(container);
        if (attempts != null && !attempts.isEmpty()) {
          // arrange attempts by allocation time
          List<TaskAttemptInfo> attemptsList = Lists.newArrayList(attempts);
          Collections.sort(attemptsList, TaskAttemptInfo.orderingOnAllocationTime());
          // walk the list to record allocation time before the current attempt
          long containerPreviousAllocatedTime = 0;
          int reUsesForVertex = 1;
          for (TaskAttemptInfo containerAttempt : attemptsList) {
            if (containerAttempt.getTaskAttemptId().equals(attempt.getTaskAttemptId())) {
              break;
            }
            if (containerAttempt.getTaskInfo().getVertexInfo().getVertexId().equals(
                attempt.getTaskInfo().getVertexInfo().getVertexId())) {
              // another task from the same vertex ran in this container. So there are multiple 
              // waves for this vertex on this container.
              reUsesForVertex++;
            }
            long cAllocTime = containerAttempt.getAllocationTime();
            long cFinishTime = containerAttempt.getFinishTime();
            if (cFinishTime > creationTime) {
              // for containerAttempts that used the container while this attempt was waiting
              // add up time container was allocated to containerAttempt. Account for allocations
              // that started before this attempt was created.
              containerPreviousAllocatedTime += 
                  (cFinishTime - (cAllocTime > creationTime ? cAllocTime : creationTime));
            }
          }
          int numVertexTasks = attempt.getTaskInfo().getVertexInfo().getNumTasks();
          int intervalMaxConcurrency = getIntervalMaxConcurrency(creationTime, finishTime);
          double numWaves = getWaves(numVertexTasks, intervalMaxConcurrency);
          
          if (reUsesForVertex > 1) {
            step.notes.add("Container ran multiple tasks for this vertex. ");
            if (numWaves < 1) {
              // less than 1 wave total but still ran more than 1 on this container
              step.notes.add("Vertex potentially seeing contention from other branches in the DAG. ");
            }
          }
          if (containerPreviousAllocatedTime == 0) {
            step.notes.add("Container newly allocated.");
          } else {
            if (containerPreviousAllocatedTime >= attempt.getCreationToAllocationTimeInterval()) {
              step.notes.add("Container was fully allocated");
            } else {
              step.notes.add("Container in use for " + 
              SVGUtils.getTimeStr(containerPreviousAllocatedTime) + " out of " +
                  SVGUtils.getTimeStr(attempt.getCreationToAllocationTimeInterval()) + 
                  " of allocation wait time");
            }
          }
        }
        // look for internal preemptions while attempt was waiting for allocation
        for (TaskAttemptInfo a : preemptedAttempts) {
          if (a.getTaskInfo().getVertexInfo().getVertexId()
              .equals(attempt.getTaskInfo().getVertexInfo().getVertexId())) {
            // dont preempt same vertex task. ideally this should look at priority but we dont have it
            continue;
          }
          if (a.getFinishTime() > creationTime && a.getFinishTime() < allocationTime) {
            // found an attempt that was preempted within this time interval
            step.notes.add("Potentially waited for preemption of " + a.getShortName());
          }
        }
      }
    }
  }