in src/main/java/org/opensearch/ad/task/ADBatchTaskRunner.java [1045:1178]
private void detectAnomaly(
ADTask adTask,
Map<Long, Optional<double[]>> dataPoints,
long pieceStartTime,
long pieceEndTime,
long dataStartTime,
long dataEndTime,
long interval,
Instant executeStartTime,
ActionListener<String> internalListener
) {
String taskId = adTask.getTaskId();
ThresholdedRandomCutForest trcf = adTaskCacheManager.getTRcfModel(taskId);
Deque<Map.Entry<Long, Optional<double[]>>> shingle = adTaskCacheManager.getShingle(taskId);
List<AnomalyResult> anomalyResults = new ArrayList<>();
long intervalEndTime = pieceStartTime;
for (int i = 0; i < pieceSize && intervalEndTime < dataEndTime; i++) {
Optional<double[]> dataPoint = dataPoints.containsKey(intervalEndTime) ? dataPoints.get(intervalEndTime) : Optional.empty();
intervalEndTime = intervalEndTime + interval;
SinglePointFeatures feature = featureManager
.getShingledFeatureForHistoricalAnalysis(adTask.getDetector(), shingle, dataPoint, intervalEndTime);
List<FeatureData> featureData = null;
if (feature.getUnprocessedFeatures().isPresent()) {
featureData = ParseUtils.getFeatureData(feature.getUnprocessedFeatures().get(), adTask.getDetector());
}
if (!feature.getProcessedFeatures().isPresent()) {
String error = feature.getUnprocessedFeatures().isPresent()
? "No full shingle in current detection window"
: "No data in current detection window";
AnomalyResult anomalyResult = new AnomalyResult(
adTask.getDetectorId(),
adTask.getDetectorLevelTaskId(),
featureData,
Instant.ofEpochMilli(intervalEndTime - interval),
Instant.ofEpochMilli(intervalEndTime),
executeStartTime,
Instant.now(),
error,
adTask.getEntity(),
adTask.getDetector().getUser(),
anomalyDetectionIndices.getSchemaVersion(ADIndex.RESULT),
adTask.getEntityModelId()
);
anomalyResults.add(anomalyResult);
} else {
double[] point = feature.getProcessedFeatures().get();
// 0 is placeholder for timestamp. In the future, we will add
// data time stamp there.
AnomalyDescriptor descriptor = trcf.process(point, 0);
double score = descriptor.getRCFScore();
if (!adTaskCacheManager.isThresholdModelTrained(taskId) && score > 0) {
adTaskCacheManager.setThresholdModelTrained(taskId, true);
}
AnomalyResult anomalyResult = AnomalyResult
.fromRawTRCFResult(
adTask.getDetectorId(),
adTask.getDetector().getDetectorIntervalInMilliseconds(),
adTask.getDetectorLevelTaskId(),
score,
descriptor.getAnomalyGrade(),
descriptor.getDataConfidence(),
featureData,
Instant.ofEpochMilli(intervalEndTime - interval),
Instant.ofEpochMilli(intervalEndTime),
executeStartTime,
Instant.now(),
null,
adTask.getEntity(),
adTask.getDetector().getUser(),
anomalyDetectionIndices.getSchemaVersion(ADIndex.RESULT),
adTask.getEntityModelId(),
modelManager.normalizeAttribution(trcf.getForest(), descriptor.getRelevantAttribution()),
descriptor.getRelativeIndex(),
descriptor.getPastValues(),
descriptor.getExpectedValuesList(),
descriptor.getLikelihoodOfValues(),
descriptor.getThreshold()
);
anomalyResults.add(anomalyResult);
}
}
String user;
List<String> roles;
if (adTask.getUser() == null) {
// It's possible that user create domain with security disabled, then enable security
// after upgrading. This is for BWC, for old detectors which created when security
// disabled, the user will be null.
user = "";
roles = settings.getAsList("", ImmutableList.of("all_access", "AmazonES_all_access"));
} else {
user = adTask.getUser().getName();
roles = adTask.getUser().getRoles();
}
String resultIndex = adTask.getDetector().getResultIndex();
if (resultIndex == null) {
// if result index is null, store anomaly result directly
storeAnomalyResultAndRunNextPiece(
adTask,
pieceEndTime,
dataStartTime,
dataEndTime,
interval,
internalListener,
anomalyResults,
resultIndex,
null
);
return;
}
try (InjectSecurity injectSecurity = new InjectSecurity(adTask.getTaskId(), settings, client.threadPool().getThreadContext())) {
// Injecting user role to verify if the user has permissions to write result to result index.
injectSecurity.inject(user, roles);
storeAnomalyResultAndRunNextPiece(
adTask,
pieceEndTime,
dataStartTime,
dataEndTime,
interval,
internalListener,
anomalyResults,
resultIndex,
() -> injectSecurity.close()
);
} catch (Exception exception) {
logger.error("Failed to inject user roles", exception);
internalListener.onFailure(exception);
}
}