in src/main/java/org/opensearch/ad/ratelimit/CheckpointReadWorker.java [165:267]
protected ActionListener<MultiGetResponse> getResponseListener(List<EntityFeatureRequest> toProcess, MultiGetRequest batchRequest) {
return ActionListener.wrap(response -> {
final MultiGetItemResponse[] itemResponses = response.getResponses();
Map<String, MultiGetItemResponse> successfulRequests = new HashMap<>();
// lazy init since we don't expect retryable requests to happen often
Set<String> retryableRequests = null;
Set<String> notFoundModels = null;
boolean printedUnexpectedFailure = false;
// contain requests that we will set the detector's exception to
// EndRunException (stop now = false)
Map<String, Exception> stopDetectorRequests = null;
for (MultiGetItemResponse itemResponse : itemResponses) {
String modelId = itemResponse.getId();
if (itemResponse.isFailed()) {
final Exception failure = itemResponse.getFailure().getFailure();
if (failure instanceof IndexNotFoundException) {
for (EntityRequest origRequest : toProcess) {
// If it is checkpoint index not found exception, I don't
// need to retry as checkpoint read is bound to fail. Just
// send everything to the cold start queue and return.
entityColdStartQueue.put(origRequest);
}
return;
} else if (ExceptionUtil.isRetryAble(failure)) {
if (retryableRequests == null) {
retryableRequests = new HashSet<>();
}
retryableRequests.add(modelId);
} else if (ExceptionUtil.isOverloaded(failure)) {
LOG.error("too many get AD model checkpoint requests or shard not available");
setCoolDownStart();
} else {
// some unexpected bug occurred or cluster is unstable (e.g., ClusterBlockException) or index is red (e.g.
// NoShardAvailableActionException) while fetching a checkpoint. As this might happen for a large amount
// of entities, we don't want to flood logs with such exception trace. Only print it once.
if (!printedUnexpectedFailure) {
LOG.error("Unexpected failure", failure);
printedUnexpectedFailure = true;
}
if (stopDetectorRequests == null) {
stopDetectorRequests = new HashMap<>();
}
stopDetectorRequests.put(modelId, failure);
}
} else if (!itemResponse.getResponse().isExists()) {
// lazy init as we don't expect retrying happens often
if (notFoundModels == null) {
notFoundModels = new HashSet<>();
}
notFoundModels.add(modelId);
} else {
successfulRequests.put(modelId, itemResponse);
}
}
// deal with not found model
if (notFoundModels != null) {
for (EntityRequest origRequest : toProcess) {
Optional<String> modelId = origRequest.getModelId();
if (modelId.isPresent() && notFoundModels.contains(modelId.get())) {
// submit to cold start queue
entityColdStartQueue.put(origRequest);
}
}
}
// deal with failures that we will retry for a limited amount of times
// before stopping the detector
// We cannot just loop over stopDetectorRequests instead of toProcess
// because we need detector id from toProcess' elements. stopDetectorRequests only has model id.
if (stopDetectorRequests != null) {
for (EntityRequest origRequest : toProcess) {
Optional<String> modelId = origRequest.getModelId();
if (modelId.isPresent() && stopDetectorRequests.containsKey(modelId.get())) {
String adID = origRequest.detectorId;
nodeStateManager
.setException(
adID,
new EndRunException(adID, CommonErrorMessages.BUG_RESPONSE, stopDetectorRequests.get(modelId.get()), false)
);
}
}
}
if (successfulRequests.isEmpty() && (retryableRequests == null || retryableRequests.isEmpty())) {
// don't need to proceed further since no checkpoint is available
return;
}
processCheckpointIteration(0, toProcess, successfulRequests, retryableRequests);
}, exception -> {
if (ExceptionUtil.isOverloaded(exception)) {
LOG.error("too many get AD model checkpoint requests or shard not available");
setCoolDownStart();
} else if (ExceptionUtil.isRetryAble(exception)) {
// retry all of them
putAll(toProcess);
} else {
LOG.error("Fail to restore models", exception);
}
});
}