in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java [1775:1886]
public TaskAttemptStateInternal transition(TaskAttemptImpl sourceAttempt,
TaskAttemptEvent event) {
TaskAttemptEventOutputFailed outputFailedEvent =
(TaskAttemptEventOutputFailed) event;
TezEvent inputFailedEvent = outputFailedEvent.getInputFailedEvent();
TezTaskAttemptID failedDestTaId = inputFailedEvent.getSourceInfo().getTaskAttemptID();
InputReadErrorEvent readErrorEvent = (InputReadErrorEvent)inputFailedEvent.getEvent();
int failedInputIndexOnDestTa = readErrorEvent.getIndex();
if (readErrorEvent.getVersion() != sourceAttempt.getTaskAttemptID().getId()) {
throw new TezUncheckedException(sourceAttempt.getTaskAttemptID()
+ " incorrectly blamed for read error from " + failedDestTaId
+ " at inputIndex " + failedInputIndexOnDestTa + " version"
+ readErrorEvent.getVersion());
}
// source host: where the data input is supposed to come from
String sHost = sourceAttempt.getNodeId().getHost();
// destination: where the data is tried to be fetched to
String dHost = readErrorEvent.getDestinationLocalhostName();
LOG.info("{} (on {}) blamed for read error from {} (on {}) at inputIndex {}", sourceAttempt.getTaskAttemptID(),
sHost, failedDestTaId, dHost, failedInputIndexOnDestTa);
boolean tooManyDownstreamHostsBlamedTheSameUpstreamHost = false;
Map<String, Set<String>> downstreamBlamingHosts = sourceAttempt.getVertex().getDownstreamBlamingHosts();
if (!downstreamBlamingHosts.containsKey(sHost)) {
LOG.info("Host {} is blamed for fetch failure from {} for the first time", sHost, dHost);
downstreamBlamingHosts.put(sHost, new HashSet<String>());
}
downstreamBlamingHosts.get(sHost).add(dHost);
int currentNumberOfFailingDownstreamHosts = downstreamBlamingHosts.get(sHost).size();
int numNodes = getNumNodes(sourceAttempt);
float hostFailureFraction = numNodes > 0 ? ((float) currentNumberOfFailingDownstreamHosts) / numNodes : 0;
double maxAllowedHostFailureFraction = sourceAttempt.getVertex().getVertexConfig()
.getMaxAllowedDownstreamHostFailuresFraction();
if (hostFailureFraction > maxAllowedHostFailureFraction) {
LOG.info("Host will be marked fail: {} because of host failure fraction {} is beyond the limit {}", sHost,
hostFailureFraction, maxAllowedHostFailureFraction);
tooManyDownstreamHostsBlamedTheSameUpstreamHost = true;
}
long time = sourceAttempt.clock.getTime();
Long firstErrReportTime = sourceAttempt.uniquefailedOutputReports.get(failedDestTaId);
if (firstErrReportTime == null) {
sourceAttempt.uniquefailedOutputReports.put(failedDestTaId, time);
firstErrReportTime = time;
}
int maxAllowedOutputFailures = sourceAttempt.getVertex().getVertexConfig()
.getMaxAllowedOutputFailures();
int maxAllowedTimeForTaskReadErrorSec = sourceAttempt.getVertex()
.getVertexConfig().getMaxAllowedTimeForTaskReadErrorSec();
double maxAllowedOutputFailuresFraction = sourceAttempt.getVertex()
.getVertexConfig().getMaxAllowedOutputFailuresFraction();
int readErrorTimespanSec = (int)((time - firstErrReportTime)/1000);
boolean crossTimeDeadline = readErrorTimespanSec >= maxAllowedTimeForTaskReadErrorSec;
int runningTasks = sourceAttempt.appContext.getCurrentDAG().getVertex(
failedDestTaId.getVertexID()).getRunningTasks();
float failureFraction =
runningTasks > 0 ? ((float) sourceAttempt.uniquefailedOutputReports.size()) / runningTasks : 0;
boolean withinFailureFractionLimits =
(failureFraction <= maxAllowedOutputFailuresFraction);
boolean withinOutputFailureLimits =
(sourceAttempt.uniquefailedOutputReports.size() < maxAllowedOutputFailures);
// If needed we can launch a background task without failing this task
// to generate a copy of the output just in case.
// If needed we can consider only running consumer tasks
if (!crossTimeDeadline && withinFailureFractionLimits && withinOutputFailureLimits
&& !(readErrorEvent.isLocalFetch() || readErrorEvent.isDiskErrorAtSource())
&& !tooManyDownstreamHostsBlamedTheSameUpstreamHost) {
return sourceAttempt.getInternalState();
}
String message = sourceAttempt.getTaskAttemptID() + " being failed for too many output errors. "
+ "failureFraction=" + failureFraction
+ ", MAX_ALLOWED_OUTPUT_FAILURES_FRACTION="
+ maxAllowedOutputFailuresFraction
+ ", uniquefailedOutputReports=" + sourceAttempt.uniquefailedOutputReports.size()
+ ", MAX_ALLOWED_OUTPUT_FAILURES=" + maxAllowedOutputFailures
+ ", hostFailureFraction=" + hostFailureFraction
+ " (" + currentNumberOfFailingDownstreamHosts + " / " + numNodes + ")"
+ ", MAX_ALLOWED_DOWNSTREAM_HOST_FAILURES_FRACTION="
+ maxAllowedHostFailureFraction
+ ", MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC="
+ maxAllowedTimeForTaskReadErrorSec
+ ", readErrorTimespan=" + readErrorTimespanSec
+ ", isLocalFetch=" + readErrorEvent.isLocalFetch()
+ ", isDiskErrorAtSource=" + readErrorEvent.isDiskErrorAtSource();
LOG.info(message);
sourceAttempt.addDiagnosticInfo(message);
// send input failed event
sourceAttempt.sendInputFailedToConsumers();
// Not checking for leafVertex since a READ_ERROR should only be reported for intermediate tasks.
if (sourceAttempt.getInternalState() == TaskAttemptStateInternal.SUCCEEDED) {
(new TerminatedAfterSuccessHelper(FAILED_HELPER)).transition(
sourceAttempt, event);
return TaskAttemptStateInternal.FAILED;
} else {
(new TerminatedWhileRunningTransition(FAILED_HELPER)).transition(
sourceAttempt, event);
return TaskAttemptStateInternal.FAIL_IN_PROGRESS;
}
// TODO at some point. Nodes may be interested in FetchFailure info.
// Can be used to blacklist nodes.
}