in src/main/java/org/opensearch/ad/EntityProfileRunner.java [202:314]
private void getJob(
String detectorId,
Entity entityValue,
Set<EntityProfileName> profilesToCollect,
AnomalyDetector detector,
EntityProfileResponse entityProfileResponse,
ActionListener<EntityProfile> listener
) {
GetRequest getRequest = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX, detectorId);
client.get(getRequest, ActionListener.wrap(getResponse -> {
if (getResponse != null && getResponse.isExists()) {
try (
XContentParser parser = XContentType.JSON
.xContent()
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser);
int totalResponsesToWait = 0;
if (profilesToCollect.contains(EntityProfileName.INIT_PROGRESS)
|| profilesToCollect.contains(EntityProfileName.STATE)) {
totalResponsesToWait++;
}
if (profilesToCollect.contains(EntityProfileName.ENTITY_INFO)) {
totalResponsesToWait++;
}
if (profilesToCollect.contains(EntityProfileName.MODELS)) {
totalResponsesToWait++;
}
MultiResponsesDelegateActionListener<EntityProfile> delegateListener =
new MultiResponsesDelegateActionListener<EntityProfile>(
listener,
totalResponsesToWait,
CommonErrorMessages.FAIL_FETCH_ERR_MSG + entityValue + " of detector " + detectorId,
false
);
if (profilesToCollect.contains(EntityProfileName.MODELS)) {
EntityProfile.Builder builder = new EntityProfile.Builder();
if (false == job.isEnabled()) {
delegateListener.onResponse(builder.build());
} else {
delegateListener.onResponse(builder.modelProfile(entityProfileResponse.getModelProfile()).build());
}
}
if (profilesToCollect.contains(EntityProfileName.INIT_PROGRESS)
|| profilesToCollect.contains(EntityProfileName.STATE)) {
profileStateRelated(
entityProfileResponse.getTotalUpdates(),
detectorId,
entityValue,
profilesToCollect,
detector,
job,
delegateListener
);
}
if (profilesToCollect.contains(EntityProfileName.ENTITY_INFO)) {
long enabledTimeMs = job.getEnabledTime().toEpochMilli();
SearchRequest lastSampleTimeRequest = createLastSampleTimeRequest(
detectorId,
enabledTimeMs,
entityValue,
detector.getResultIndex()
);
EntityProfile.Builder builder = new EntityProfile.Builder();
Optional<Boolean> isActiveOp = entityProfileResponse.isActive();
if (isActiveOp.isPresent()) {
builder.isActive(isActiveOp.get());
}
builder.lastActiveTimestampMs(entityProfileResponse.getLastActiveMs());
client.search(lastSampleTimeRequest, ActionListener.wrap(searchResponse -> {
Optional<Long> latestSampleTimeMs = ParseUtils.getLatestDataTime(searchResponse);
if (latestSampleTimeMs.isPresent()) {
builder.lastSampleTimestampMs(latestSampleTimeMs.get());
}
delegateListener.onResponse(builder.build());
}, exception -> {
// sth wrong like result index not created. Return what we have
if (exception instanceof IndexNotFoundException) {
// don't print out stack trace since it is not helpful
logger.info("Result index hasn't been created", exception.getMessage());
} else {
logger.warn("fail to get last sample time", exception);
}
delegateListener.onResponse(builder.build());
}));
}
} catch (Exception e) {
logger.error(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e);
listener.onFailure(e);
}
} else {
sendUnknownState(profilesToCollect, entityValue, true, listener);
}
}, exception -> {
if (exception instanceof IndexNotFoundException) {
logger.info(exception.getMessage());
sendUnknownState(profilesToCollect, entityValue, true, listener);
} else {
logger.error(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG + detectorId, exception);
listener.onFailure(exception);
}
}));
}