private long speculationValue()

in tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java [332:444]


  private long speculationValue(Task task, long now, boolean shouldUseTimeout) {
    Map<TezTaskAttemptID, TaskAttempt> attempts = task.getAttempts();
    TezTaskID taskID = task.getTaskID();
    long acceptableRuntime = Long.MIN_VALUE;
    long result = Long.MIN_VALUE;

    // short circuit completed tasks. no need to spend time on them
    if (task.getState() == TaskState.SUCCEEDED) {
      // remove the task from may have speculated if it exists
      mayHaveSpeculated.remove(taskID);
      return NOT_RUNNING;
    }

    if (!mayHaveSpeculated.contains(taskID) && !shouldUseTimeout) {
      acceptableRuntime = estimator.thresholdRuntime(taskID);
      if (acceptableRuntime == Long.MAX_VALUE) {
        return ON_SCHEDULE;
      }
    }

    TezTaskAttemptID runningTaskAttemptID;
    int numberRunningAttempts = 0;

    for (TaskAttempt taskAttempt : attempts.values()) {
      TaskAttemptState taskAttemptState = taskAttempt.getState();
      if (taskAttemptState == TaskAttemptState.RUNNING
          || taskAttemptState == TaskAttemptState.STARTING) {
        if (++numberRunningAttempts > 1) {
          return ALREADY_SPECULATING;
        }
        runningTaskAttemptID = taskAttempt.getTaskAttemptID();

        long taskAttemptStartTime
            = estimator.attemptEnrolledTime(runningTaskAttemptID);
        if (taskAttemptStartTime > now) {
          // This background process ran before we could process the task
          //  attempt status change that chronicles the attempt start
          return TOO_NEW;
        }

        if (shouldUseTimeout) {
          if ((now - taskAttemptStartTime) > taskTimeout) {
            // If the task has timed out, then we want to schedule a speculation
            // immediately. However we cannot return immediately since we may
            // already have a speculation running.
            result = Long.MAX_VALUE;
          } else {
            // Task has not timed out so we are good
            return ON_SCHEDULE;
          }
        } else {
          long estimatedRunTime = estimator
              .estimatedRuntime(runningTaskAttemptID);

          long estimatedEndTime = estimatedRunTime + taskAttemptStartTime;

          long estimatedReplacementEndTime
                  = now + estimator.newAttemptEstimatedRuntime();

          float progress = taskAttempt.getProgress();
          TaskAttemptHistoryStatistics data =
                  runningTaskAttemptStatistics.get(runningTaskAttemptID);
          if (data == null) {
            runningTaskAttemptStatistics.put(runningTaskAttemptID,
                new TaskAttemptHistoryStatistics(estimatedRunTime, progress,
                    now));
          } else {
            if (estimatedRunTime == data.getEstimatedRunTime()
                    && progress == data.getProgress()) {
              // Previous stats are same as same stats
              if (data.notHeartbeatedInAWhile(now)
                  || estimator
                  .hasStagnatedProgress(runningTaskAttemptID, now)) {
                // Stats have stagnated for a while, simulate heart-beat.
                // Now simulate the heart-beat
                statusUpdate(taskAttempt.getTaskAttemptID(), taskAttempt.getState(),
                    clock.getTime());
              }
            } else {
              // Stats have changed - update our data structure
              data.setEstimatedRunTime(estimatedRunTime);
              data.setProgress(progress);
              data.resetHeartBeatTime(now);
            }
          }

          if (estimatedEndTime < now) {
            return PROGRESS_IS_GOOD;
          }

          if (estimatedReplacementEndTime >= estimatedEndTime) {
            return TOO_LATE_TO_SPECULATE;
          }

          result = estimatedEndTime - estimatedReplacementEndTime;
        }
      }
    }

    // If we are here, there's at most one task attempt.
    if (numberRunningAttempts == 0) {
      return NOT_RUNNING;
    }

    if ((acceptableRuntime == Long.MIN_VALUE) && !shouldUseTimeout) {
      acceptableRuntime = estimator.thresholdRuntime(taskID);
      if (acceptableRuntime == Long.MAX_VALUE) {
        return ON_SCHEDULE;
      }
    }

    return result;
  }