in tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java [332:444]
private long speculationValue(Task task, long now, boolean shouldUseTimeout) {
Map<TezTaskAttemptID, TaskAttempt> attempts = task.getAttempts();
TezTaskID taskID = task.getTaskID();
long acceptableRuntime = Long.MIN_VALUE;
long result = Long.MIN_VALUE;
// short circuit completed tasks. no need to spend time on them
if (task.getState() == TaskState.SUCCEEDED) {
// remove the task from may have speculated if it exists
mayHaveSpeculated.remove(taskID);
return NOT_RUNNING;
}
if (!mayHaveSpeculated.contains(taskID) && !shouldUseTimeout) {
acceptableRuntime = estimator.thresholdRuntime(taskID);
if (acceptableRuntime == Long.MAX_VALUE) {
return ON_SCHEDULE;
}
}
TezTaskAttemptID runningTaskAttemptID;
int numberRunningAttempts = 0;
for (TaskAttempt taskAttempt : attempts.values()) {
TaskAttemptState taskAttemptState = taskAttempt.getState();
if (taskAttemptState == TaskAttemptState.RUNNING
|| taskAttemptState == TaskAttemptState.STARTING) {
if (++numberRunningAttempts > 1) {
return ALREADY_SPECULATING;
}
runningTaskAttemptID = taskAttempt.getTaskAttemptID();
long taskAttemptStartTime
= estimator.attemptEnrolledTime(runningTaskAttemptID);
if (taskAttemptStartTime > now) {
// This background process ran before we could process the task
// attempt status change that chronicles the attempt start
return TOO_NEW;
}
if (shouldUseTimeout) {
if ((now - taskAttemptStartTime) > taskTimeout) {
// If the task has timed out, then we want to schedule a speculation
// immediately. However we cannot return immediately since we may
// already have a speculation running.
result = Long.MAX_VALUE;
} else {
// Task has not timed out so we are good
return ON_SCHEDULE;
}
} else {
long estimatedRunTime = estimator
.estimatedRuntime(runningTaskAttemptID);
long estimatedEndTime = estimatedRunTime + taskAttemptStartTime;
long estimatedReplacementEndTime
= now + estimator.newAttemptEstimatedRuntime();
float progress = taskAttempt.getProgress();
TaskAttemptHistoryStatistics data =
runningTaskAttemptStatistics.get(runningTaskAttemptID);
if (data == null) {
runningTaskAttemptStatistics.put(runningTaskAttemptID,
new TaskAttemptHistoryStatistics(estimatedRunTime, progress,
now));
} else {
if (estimatedRunTime == data.getEstimatedRunTime()
&& progress == data.getProgress()) {
// Previous stats are same as same stats
if (data.notHeartbeatedInAWhile(now)
|| estimator
.hasStagnatedProgress(runningTaskAttemptID, now)) {
// Stats have stagnated for a while, simulate heart-beat.
// Now simulate the heart-beat
statusUpdate(taskAttempt.getTaskAttemptID(), taskAttempt.getState(),
clock.getTime());
}
} else {
// Stats have changed - update our data structure
data.setEstimatedRunTime(estimatedRunTime);
data.setProgress(progress);
data.resetHeartBeatTime(now);
}
}
if (estimatedEndTime < now) {
return PROGRESS_IS_GOOD;
}
if (estimatedReplacementEndTime >= estimatedEndTime) {
return TOO_LATE_TO_SPECULATE;
}
result = estimatedEndTime - estimatedReplacementEndTime;
}
}
}
// If we are here, there's at most one task attempt.
if (numberRunningAttempts == 0) {
return NOT_RUNNING;
}
if ((acceptableRuntime == Long.MIN_VALUE) && !shouldUseTimeout) {
acceptableRuntime = estimator.thresholdRuntime(taskID);
if (acceptableRuntime == Long.MAX_VALUE) {
return ON_SCHEDULE;
}
}
return result;
}