public TaskAttemptStateInternal transition()

in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java [1775:1886]


    public TaskAttemptStateInternal transition(TaskAttemptImpl sourceAttempt,
        TaskAttemptEvent event) {
      TaskAttemptEventOutputFailed outputFailedEvent = 
          (TaskAttemptEventOutputFailed) event;
      TezEvent inputFailedEvent = outputFailedEvent.getInputFailedEvent();
      TezTaskAttemptID failedDestTaId = inputFailedEvent.getSourceInfo().getTaskAttemptID();

      InputReadErrorEvent readErrorEvent = (InputReadErrorEvent)inputFailedEvent.getEvent();
      int failedInputIndexOnDestTa = readErrorEvent.getIndex();

      if (readErrorEvent.getVersion() != sourceAttempt.getTaskAttemptID().getId()) {
        throw new TezUncheckedException(sourceAttempt.getTaskAttemptID()
            + " incorrectly blamed for read error from " + failedDestTaId
            + " at inputIndex " + failedInputIndexOnDestTa + " version"
            + readErrorEvent.getVersion());
      }
      // source host: where the data input is supposed to come from
      String sHost = sourceAttempt.getNodeId().getHost();
      // destination: where the data is tried to be fetched to
      String dHost = readErrorEvent.getDestinationLocalhostName();

      LOG.info("{} (on {}) blamed for read error from {} (on {}) at inputIndex {}", sourceAttempt.getTaskAttemptID(),
          sHost, failedDestTaId, dHost, failedInputIndexOnDestTa);

      boolean tooManyDownstreamHostsBlamedTheSameUpstreamHost = false;
      Map<String, Set<String>> downstreamBlamingHosts = sourceAttempt.getVertex().getDownstreamBlamingHosts();
      if (!downstreamBlamingHosts.containsKey(sHost)) {
        LOG.info("Host {} is blamed for fetch failure from {} for the first time", sHost, dHost);
        downstreamBlamingHosts.put(sHost, new HashSet<String>());
      }

      downstreamBlamingHosts.get(sHost).add(dHost);
      int currentNumberOfFailingDownstreamHosts = downstreamBlamingHosts.get(sHost).size();
      int numNodes = getNumNodes(sourceAttempt);
      float hostFailureFraction = numNodes > 0 ? ((float) currentNumberOfFailingDownstreamHosts) / numNodes : 0;
      double maxAllowedHostFailureFraction = sourceAttempt.getVertex().getVertexConfig()
          .getMaxAllowedDownstreamHostFailuresFraction();

      if (hostFailureFraction > maxAllowedHostFailureFraction) {
        LOG.info("Host will be marked fail: {} because of host failure fraction {} is beyond the limit {}", sHost,
            hostFailureFraction, maxAllowedHostFailureFraction);
        tooManyDownstreamHostsBlamedTheSameUpstreamHost = true;
      }

      long time = sourceAttempt.clock.getTime();

      Long firstErrReportTime = sourceAttempt.uniquefailedOutputReports.get(failedDestTaId);
      if (firstErrReportTime == null) {
        sourceAttempt.uniquefailedOutputReports.put(failedDestTaId, time);
        firstErrReportTime = time;
      }

      int maxAllowedOutputFailures = sourceAttempt.getVertex().getVertexConfig()
          .getMaxAllowedOutputFailures();
      int maxAllowedTimeForTaskReadErrorSec = sourceAttempt.getVertex()
          .getVertexConfig().getMaxAllowedTimeForTaskReadErrorSec();
      double maxAllowedOutputFailuresFraction = sourceAttempt.getVertex()
          .getVertexConfig().getMaxAllowedOutputFailuresFraction();

      int readErrorTimespanSec = (int)((time - firstErrReportTime)/1000);
      boolean crossTimeDeadline = readErrorTimespanSec >= maxAllowedTimeForTaskReadErrorSec;

      int runningTasks = sourceAttempt.appContext.getCurrentDAG().getVertex(
          failedDestTaId.getVertexID()).getRunningTasks();
      float failureFraction =
          runningTasks > 0 ? ((float) sourceAttempt.uniquefailedOutputReports.size()) / runningTasks : 0;
      boolean withinFailureFractionLimits =
          (failureFraction <= maxAllowedOutputFailuresFraction);
      boolean withinOutputFailureLimits =
          (sourceAttempt.uniquefailedOutputReports.size() < maxAllowedOutputFailures);

      // If needed we can launch a background task without failing this task
      // to generate a copy of the output just in case.
      // If needed we can consider only running consumer tasks
      if (!crossTimeDeadline && withinFailureFractionLimits && withinOutputFailureLimits
          && !(readErrorEvent.isLocalFetch() || readErrorEvent.isDiskErrorAtSource())
          && !tooManyDownstreamHostsBlamedTheSameUpstreamHost) {
        return sourceAttempt.getInternalState();
      }
      String message = sourceAttempt.getTaskAttemptID() + " being failed for too many output errors. "
          + "failureFraction=" + failureFraction
          + ", MAX_ALLOWED_OUTPUT_FAILURES_FRACTION="
          + maxAllowedOutputFailuresFraction
          + ", uniquefailedOutputReports=" + sourceAttempt.uniquefailedOutputReports.size()
          + ", MAX_ALLOWED_OUTPUT_FAILURES=" + maxAllowedOutputFailures
          + ", hostFailureFraction=" + hostFailureFraction
          + " (" + currentNumberOfFailingDownstreamHosts + " / " + numNodes + ")"
          + ", MAX_ALLOWED_DOWNSTREAM_HOST_FAILURES_FRACTION="
          + maxAllowedHostFailureFraction
          + ", MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC="
          + maxAllowedTimeForTaskReadErrorSec
          + ", readErrorTimespan=" + readErrorTimespanSec
          + ", isLocalFetch=" + readErrorEvent.isLocalFetch()
          + ", isDiskErrorAtSource=" + readErrorEvent.isDiskErrorAtSource();

      LOG.info(message);
      sourceAttempt.addDiagnosticInfo(message);
      // send input failed event
      sourceAttempt.sendInputFailedToConsumers();
      // Not checking for leafVertex since a READ_ERROR should only be reported for intermediate tasks.
      if (sourceAttempt.getInternalState() == TaskAttemptStateInternal.SUCCEEDED) {
        (new TerminatedAfterSuccessHelper(FAILED_HELPER)).transition(
            sourceAttempt, event);
        return TaskAttemptStateInternal.FAILED;
      } else {
        (new TerminatedWhileRunningTransition(FAILED_HELPER)).transition(
            sourceAttempt, event);
        return TaskAttemptStateInternal.FAIL_IN_PROGRESS;
      }
      // TODO at some point. Nodes may be interested in FetchFailure info.
      // Can be used to blacklist nodes.
    }