void onResourceCompletedWithUnknownStatus()

in samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java [504:612]


  void onResourceCompletedWithUnknownStatus(SamzaResourceStatus resourceStatus, String containerId, String processorId,
      int exitStatus) {
    LOG.info("Container ID: {} for Processor ID: {} failed with exit code: {}.", containerId, processorId, exitStatus);
    Instant now = Instant.now();
    state.failedContainers.incrementAndGet();
    state.jobHealthy.set(false);

    state.neededProcessors.incrementAndGet();
    // Find out previously running container location
    String lastSeenOn = Optional.ofNullable(localityManager.readLocality().getProcessorLocality(processorId))
        .map(ProcessorLocality::host)
        .orElse(null);
    if (!hostAffinityEnabled || StringUtils.isBlank(lastSeenOn)) {
      lastSeenOn = ResourceRequestState.ANY_HOST;
    }
    LOG.info("Container ID: {} for Processor ID: {} was last seen on host {}.", containerId, processorId, lastSeenOn);
    // A container failed for an unknown reason. Let's check to see if
    // we need to shutdown the whole app master if too many container
    // failures have happened. The rules for failing are that the
    // failure count for a task group id must be > the configured retry
    // count, and the last failure (the one prior to this one) must have
    // happened less than retry window ms ago. If retry count is set to
    // 0, the app master will fail on any container failure. If the
    // retry count is set to a number < 0, a container failure will
    // never trigger an app master failure.
    int retryCount = clusterManagerConfig.getContainerRetryCount();
    int retryWindowMs = clusterManagerConfig.getContainerRetryWindowMs();
    int currentFailCount;
    boolean retryContainerRequest = true;

    if (retryCount == 0) {
      // Failure criteria met only if failed containers can fail the job.
      jobFailureCriteriaMet = clusterManagerConfig.shouldFailJobAfterContainerRetries();
      if (jobFailureCriteriaMet) {
        LOG.error("Processor ID: {} (current Container ID: {}) failed, and retry count is set to 0, " +
            "so shutting down the application master and marking the job as failed.", processorId, containerId);
      } else {
        LOG.error("Processor ID: {} (current Container ID: {}) failed, and retry count is set to 0, " +
            "but the job will continue to run with the failed container.", processorId, containerId);
        state.failedProcessors.put(processorId, resourceStatus);
      }
      retryContainerRequest = false;
    } else if (retryCount > 0) {
      long durationSinceLastRetryMs;
      if (processorFailures.containsKey(processorId)) {
        ProcessorFailure failure = processorFailures.get(processorId);
        currentFailCount = failure.getCount() + 1;
        Duration lastRetryDelay = getRetryDelay(processorId);
        Instant retryAttemptedAt = failure.getLastFailure().plus(lastRetryDelay);
        durationSinceLastRetryMs = now.toEpochMilli() - retryAttemptedAt.toEpochMilli();
        if (durationSinceLastRetryMs < 0) {
          // This should never happen without changes to the system clock or time travel. Log a warning just in case.
          LOG.warn("Last failure at: {} with a retry attempted at: {} which is supposed to be before current time of: {}",
              failure.getLastFailure(), retryAttemptedAt, now);
        }
      } else {
        currentFailCount = 1;
        durationSinceLastRetryMs = 0;
      }

      if (durationSinceLastRetryMs >= retryWindowMs) {
        LOG.info("Resetting failure count for Processor ID: {} back to 1, since last failure " +
            "(for Container ID: {}) was outside the bounds of the retry window.", processorId, containerId);

        // Reset counter back to 1, since the last failure for this
        // container happened outside the window boundary.
        currentFailCount = 1;
      }

      // if fail count is (1 initial failure + max retries) then fail job.
      if (currentFailCount > retryCount) {
        LOG.error("Processor ID: {} (current Container ID: {}) has failed {} times. "
                + "This is greater that the retry count of {}."
                + "The failure occurred {} ms after the previous one, which is less than the retry window of {} ms.",
            processorId, containerId, currentFailCount, retryCount, durationSinceLastRetryMs, retryWindowMs);

        // We have too many failures, and we're within the window
        // boundary, so reset shut down the app master.
        retryContainerRequest = false;
        if (clusterManagerConfig.shouldFailJobAfterContainerRetries()) {
          jobFailureCriteriaMet = true;
          LOG.error("Shutting down the application master and marking the job as failed after max retry attempts.");
          state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
        } else {
          LOG.warn("Processor ID: {} with Container ID: {} failed after all retry attempts. Job will continue to run without this container.",
              processorId, containerId);
          state.failedProcessors.put(processorId, resourceStatus);
        }
      } else {
        LOG.info("Current failure count for Processor ID: {} is {}.", processorId, currentFailCount);
        Duration retryDelay = Duration.ZERO;
        if (!ResourceRequestState.ANY_HOST.equals(lastSeenOn) && currentFailCount == retryCount) {
          // Add the preferred host last retry delay on the last retry
          retryDelay = Duration.ofMillis(clusterManagerConfig.getContainerPreferredHostLastRetryDelayMs());
        }
        processorFailures.put(processorId, new ProcessorFailure(currentFailCount, now, retryDelay));
        retryContainerRequest = true;
      }
    }

    if (retryContainerRequest) {
      Duration retryDelay = getRetryDelay(processorId);
      if (!retryDelay.isZero()) {
        LOG.info("Adding a delay of: {} seconds on the last container retry request for preferred host: {}",
            retryDelay.getSeconds(), lastSeenOn);
      }
      handleContainerStop(processorId, resourceStatus.getContainerId(), lastSeenOn, exitStatus, retryDelay);
    }
  }