in samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java [504:612]
void onResourceCompletedWithUnknownStatus(SamzaResourceStatus resourceStatus, String containerId, String processorId,
int exitStatus) {
LOG.info("Container ID: {} for Processor ID: {} failed with exit code: {}.", containerId, processorId, exitStatus);
Instant now = Instant.now();
state.failedContainers.incrementAndGet();
state.jobHealthy.set(false);
state.neededProcessors.incrementAndGet();
// Find out previously running container location
String lastSeenOn = Optional.ofNullable(localityManager.readLocality().getProcessorLocality(processorId))
.map(ProcessorLocality::host)
.orElse(null);
if (!hostAffinityEnabled || StringUtils.isBlank(lastSeenOn)) {
lastSeenOn = ResourceRequestState.ANY_HOST;
}
LOG.info("Container ID: {} for Processor ID: {} was last seen on host {}.", containerId, processorId, lastSeenOn);
// A container failed for an unknown reason. Let's check to see if
// we need to shutdown the whole app master if too many container
// failures have happened. The rules for failing are that the
// failure count for a task group id must be > the configured retry
// count, and the last failure (the one prior to this one) must have
// happened less than retry window ms ago. If retry count is set to
// 0, the app master will fail on any container failure. If the
// retry count is set to a number < 0, a container failure will
// never trigger an app master failure.
int retryCount = clusterManagerConfig.getContainerRetryCount();
int retryWindowMs = clusterManagerConfig.getContainerRetryWindowMs();
int currentFailCount;
boolean retryContainerRequest = true;
if (retryCount == 0) {
// Failure criteria met only if failed containers can fail the job.
jobFailureCriteriaMet = clusterManagerConfig.shouldFailJobAfterContainerRetries();
if (jobFailureCriteriaMet) {
LOG.error("Processor ID: {} (current Container ID: {}) failed, and retry count is set to 0, " +
"so shutting down the application master and marking the job as failed.", processorId, containerId);
} else {
LOG.error("Processor ID: {} (current Container ID: {}) failed, and retry count is set to 0, " +
"but the job will continue to run with the failed container.", processorId, containerId);
state.failedProcessors.put(processorId, resourceStatus);
}
retryContainerRequest = false;
} else if (retryCount > 0) {
long durationSinceLastRetryMs;
if (processorFailures.containsKey(processorId)) {
ProcessorFailure failure = processorFailures.get(processorId);
currentFailCount = failure.getCount() + 1;
Duration lastRetryDelay = getRetryDelay(processorId);
Instant retryAttemptedAt = failure.getLastFailure().plus(lastRetryDelay);
durationSinceLastRetryMs = now.toEpochMilli() - retryAttemptedAt.toEpochMilli();
if (durationSinceLastRetryMs < 0) {
// This should never happen without changes to the system clock or time travel. Log a warning just in case.
LOG.warn("Last failure at: {} with a retry attempted at: {} which is supposed to be before current time of: {}",
failure.getLastFailure(), retryAttemptedAt, now);
}
} else {
currentFailCount = 1;
durationSinceLastRetryMs = 0;
}
if (durationSinceLastRetryMs >= retryWindowMs) {
LOG.info("Resetting failure count for Processor ID: {} back to 1, since last failure " +
"(for Container ID: {}) was outside the bounds of the retry window.", processorId, containerId);
// Reset counter back to 1, since the last failure for this
// container happened outside the window boundary.
currentFailCount = 1;
}
// if fail count is (1 initial failure + max retries) then fail job.
if (currentFailCount > retryCount) {
LOG.error("Processor ID: {} (current Container ID: {}) has failed {} times. "
+ "This is greater that the retry count of {}."
+ "The failure occurred {} ms after the previous one, which is less than the retry window of {} ms.",
processorId, containerId, currentFailCount, retryCount, durationSinceLastRetryMs, retryWindowMs);
// We have too many failures, and we're within the window
// boundary, so reset shut down the app master.
retryContainerRequest = false;
if (clusterManagerConfig.shouldFailJobAfterContainerRetries()) {
jobFailureCriteriaMet = true;
LOG.error("Shutting down the application master and marking the job as failed after max retry attempts.");
state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
} else {
LOG.warn("Processor ID: {} with Container ID: {} failed after all retry attempts. Job will continue to run without this container.",
processorId, containerId);
state.failedProcessors.put(processorId, resourceStatus);
}
} else {
LOG.info("Current failure count for Processor ID: {} is {}.", processorId, currentFailCount);
Duration retryDelay = Duration.ZERO;
if (!ResourceRequestState.ANY_HOST.equals(lastSeenOn) && currentFailCount == retryCount) {
// Add the preferred host last retry delay on the last retry
retryDelay = Duration.ofMillis(clusterManagerConfig.getContainerPreferredHostLastRetryDelayMs());
}
processorFailures.put(processorId, new ProcessorFailure(currentFailCount, now, retryDelay));
retryContainerRequest = true;
}
}
if (retryContainerRequest) {
Duration retryDelay = getRetryDelay(processorId);
if (!retryDelay.isZero()) {
LOG.info("Adding a delay of: {} seconds on the last container retry request for preferred host: {}",
retryDelay.getSeconds(), lastSeenOn);
}
handleContainerStop(processorId, resourceStatus.getContainerId(), lastSeenOn, exitStatus, retryDelay);
}
}