in src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java [147:284]
private void prepareProfile(
AnomalyDetector detector,
ActionListener<DetectorProfile> listener,
Set<DetectorProfileName> profilesToCollect
) {
String detectorId = detector.getDetectorId();
GetRequest getRequest = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX, detectorId);
client.get(getRequest, ActionListener.wrap(getResponse -> {
if (getResponse != null && getResponse.isExists()) {
try (
XContentParser parser = XContentType.JSON
.xContent()
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser);
long enabledTimeMs = job.getEnabledTime().toEpochMilli();
boolean isMultiEntityDetector = detector.isMultientityDetector();
int totalResponsesToWait = 0;
if (profilesToCollect.contains(DetectorProfileName.ERROR)) {
totalResponsesToWait++;
}
// total number of listeners we need to define. Needed by MultiResponsesDelegateActionListener to decide
// when to consolidate results and return to users
if (isMultiEntityDetector) {
if (profilesToCollect.contains(DetectorProfileName.TOTAL_ENTITIES)) {
totalResponsesToWait++;
}
if (profilesToCollect.contains(DetectorProfileName.COORDINATING_NODE)
|| profilesToCollect.contains(DetectorProfileName.SHINGLE_SIZE)
|| profilesToCollect.contains(DetectorProfileName.TOTAL_SIZE_IN_BYTES)
|| profilesToCollect.contains(DetectorProfileName.MODELS)
|| profilesToCollect.contains(DetectorProfileName.ACTIVE_ENTITIES)
|| profilesToCollect.contains(DetectorProfileName.INIT_PROGRESS)
|| profilesToCollect.contains(DetectorProfileName.STATE)) {
totalResponsesToWait++;
}
if (profilesToCollect.contains(DetectorProfileName.AD_TASK)) {
totalResponsesToWait++;
}
} else {
if (profilesToCollect.contains(DetectorProfileName.STATE)
|| profilesToCollect.contains(DetectorProfileName.INIT_PROGRESS)) {
totalResponsesToWait++;
}
if (profilesToCollect.contains(DetectorProfileName.COORDINATING_NODE)
|| profilesToCollect.contains(DetectorProfileName.SHINGLE_SIZE)
|| profilesToCollect.contains(DetectorProfileName.TOTAL_SIZE_IN_BYTES)
|| profilesToCollect.contains(DetectorProfileName.MODELS)) {
totalResponsesToWait++;
}
if (profilesToCollect.contains(DetectorProfileName.AD_TASK)) {
totalResponsesToWait++;
}
}
MultiResponsesDelegateActionListener<DetectorProfile> delegateListener =
new MultiResponsesDelegateActionListener<DetectorProfile>(
listener,
totalResponsesToWait,
CommonErrorMessages.FAIL_FETCH_ERR_MSG + detectorId,
false
);
if (profilesToCollect.contains(DetectorProfileName.ERROR)) {
adTaskManager.getAndExecuteOnLatestDetectorLevelTask(detectorId, ADTaskType.REALTIME_TASK_TYPES, adTask -> {
DetectorProfile.Builder profileBuilder = new DetectorProfile.Builder();
if (adTask.isPresent()) {
long lastUpdateTimeMs = adTask.get().getLastUpdateTime().toEpochMilli();
// if state index hasn't been updated, we should not use the error field
// For example, before a detector is enabled, if the error message contains
// the phrase "stopped due to blah", we should not show this when the detector
// is enabled.
if (lastUpdateTimeMs > enabledTimeMs && adTask.get().getError() != null) {
profileBuilder.error(adTask.get().getError());
}
delegateListener.onResponse(profileBuilder.build());
} else {
// detector state for this detector does not exist
delegateListener.onResponse(profileBuilder.build());
}
}, transportService, false, delegateListener);
}
// total number of listeners we need to define. Needed by MultiResponsesDelegateActionListener to decide
// when to consolidate results and return to users
if (isMultiEntityDetector) {
if (profilesToCollect.contains(DetectorProfileName.TOTAL_ENTITIES)) {
profileEntityStats(delegateListener, detector);
}
if (profilesToCollect.contains(DetectorProfileName.COORDINATING_NODE)
|| profilesToCollect.contains(DetectorProfileName.SHINGLE_SIZE)
|| profilesToCollect.contains(DetectorProfileName.TOTAL_SIZE_IN_BYTES)
|| profilesToCollect.contains(DetectorProfileName.MODELS)
|| profilesToCollect.contains(DetectorProfileName.ACTIVE_ENTITIES)
|| profilesToCollect.contains(DetectorProfileName.INIT_PROGRESS)
|| profilesToCollect.contains(DetectorProfileName.STATE)) {
profileModels(detector, profilesToCollect, job, true, delegateListener);
}
if (profilesToCollect.contains(DetectorProfileName.AD_TASK)) {
adTaskManager.getLatestHistoricalTaskProfile(detectorId, transportService, null, delegateListener);
}
} else {
if (profilesToCollect.contains(DetectorProfileName.STATE)
|| profilesToCollect.contains(DetectorProfileName.INIT_PROGRESS)) {
profileStateRelated(detector, delegateListener, job.isEnabled(), profilesToCollect);
}
if (profilesToCollect.contains(DetectorProfileName.COORDINATING_NODE)
|| profilesToCollect.contains(DetectorProfileName.SHINGLE_SIZE)
|| profilesToCollect.contains(DetectorProfileName.TOTAL_SIZE_IN_BYTES)
|| profilesToCollect.contains(DetectorProfileName.MODELS)) {
profileModels(detector, profilesToCollect, job, false, delegateListener);
}
if (profilesToCollect.contains(DetectorProfileName.AD_TASK)) {
adTaskManager.getLatestHistoricalTaskProfile(detectorId, transportService, null, delegateListener);
}
}
} catch (Exception e) {
logger.error(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e);
listener.onFailure(e);
}
} else {
onGetDetectorForPrepare(detectorId, listener, profilesToCollect);
}
}, exception -> {
if (ExceptionUtil.isIndexNotAvailable(exception)) {
logger.info(exception.getMessage());
onGetDetectorForPrepare(detectorId, listener, profilesToCollect);
} else {
logger.error(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG + detectorId);
listener.onFailure(exception);
}
}));
}