private void detectAnomaly()

in src/main/java/org/opensearch/ad/task/ADBatchTaskRunner.java [1045:1178]


    private void detectAnomaly(
        ADTask adTask,
        Map<Long, Optional<double[]>> dataPoints,
        long pieceStartTime,
        long pieceEndTime,
        long dataStartTime,
        long dataEndTime,
        long interval,
        Instant executeStartTime,
        ActionListener<String> internalListener
    ) {
        String taskId = adTask.getTaskId();
        ThresholdedRandomCutForest trcf = adTaskCacheManager.getTRcfModel(taskId);
        Deque<Map.Entry<Long, Optional<double[]>>> shingle = adTaskCacheManager.getShingle(taskId);

        List<AnomalyResult> anomalyResults = new ArrayList<>();

        long intervalEndTime = pieceStartTime;
        for (int i = 0; i < pieceSize && intervalEndTime < dataEndTime; i++) {
            Optional<double[]> dataPoint = dataPoints.containsKey(intervalEndTime) ? dataPoints.get(intervalEndTime) : Optional.empty();
            intervalEndTime = intervalEndTime + interval;
            SinglePointFeatures feature = featureManager
                .getShingledFeatureForHistoricalAnalysis(adTask.getDetector(), shingle, dataPoint, intervalEndTime);
            List<FeatureData> featureData = null;
            if (feature.getUnprocessedFeatures().isPresent()) {
                featureData = ParseUtils.getFeatureData(feature.getUnprocessedFeatures().get(), adTask.getDetector());
            }
            if (!feature.getProcessedFeatures().isPresent()) {
                String error = feature.getUnprocessedFeatures().isPresent()
                    ? "No full shingle in current detection window"
                    : "No data in current detection window";
                AnomalyResult anomalyResult = new AnomalyResult(
                    adTask.getDetectorId(),
                    adTask.getDetectorLevelTaskId(),
                    featureData,
                    Instant.ofEpochMilli(intervalEndTime - interval),
                    Instant.ofEpochMilli(intervalEndTime),
                    executeStartTime,
                    Instant.now(),
                    error,
                    adTask.getEntity(),
                    adTask.getDetector().getUser(),
                    anomalyDetectionIndices.getSchemaVersion(ADIndex.RESULT),
                    adTask.getEntityModelId()
                );
                anomalyResults.add(anomalyResult);
            } else {
                double[] point = feature.getProcessedFeatures().get();
                // 0 is placeholder for timestamp. In the future, we will add
                // data time stamp there.
                AnomalyDescriptor descriptor = trcf.process(point, 0);
                double score = descriptor.getRCFScore();
                if (!adTaskCacheManager.isThresholdModelTrained(taskId) && score > 0) {
                    adTaskCacheManager.setThresholdModelTrained(taskId, true);
                }

                AnomalyResult anomalyResult = AnomalyResult
                    .fromRawTRCFResult(
                        adTask.getDetectorId(),
                        adTask.getDetector().getDetectorIntervalInMilliseconds(),
                        adTask.getDetectorLevelTaskId(),
                        score,
                        descriptor.getAnomalyGrade(),
                        descriptor.getDataConfidence(),
                        featureData,
                        Instant.ofEpochMilli(intervalEndTime - interval),
                        Instant.ofEpochMilli(intervalEndTime),
                        executeStartTime,
                        Instant.now(),
                        null,
                        adTask.getEntity(),
                        adTask.getDetector().getUser(),
                        anomalyDetectionIndices.getSchemaVersion(ADIndex.RESULT),
                        adTask.getEntityModelId(),
                        modelManager.normalizeAttribution(trcf.getForest(), descriptor.getRelevantAttribution()),
                        descriptor.getRelativeIndex(),
                        descriptor.getPastValues(),
                        descriptor.getExpectedValuesList(),
                        descriptor.getLikelihoodOfValues(),
                        descriptor.getThreshold()
                    );
                anomalyResults.add(anomalyResult);
            }
        }

        String user;
        List<String> roles;
        if (adTask.getUser() == null) {
            // It's possible that user create domain with security disabled, then enable security
            // after upgrading. This is for BWC, for old detectors which created when security
            // disabled, the user will be null.
            user = "";
            roles = settings.getAsList("", ImmutableList.of("all_access", "AmazonES_all_access"));
        } else {
            user = adTask.getUser().getName();
            roles = adTask.getUser().getRoles();
        }
        String resultIndex = adTask.getDetector().getResultIndex();

        if (resultIndex == null) {
            // if result index is null, store anomaly result directly
            storeAnomalyResultAndRunNextPiece(
                adTask,
                pieceEndTime,
                dataStartTime,
                dataEndTime,
                interval,
                internalListener,
                anomalyResults,
                resultIndex,
                null
            );
            return;
        }

        try (InjectSecurity injectSecurity = new InjectSecurity(adTask.getTaskId(), settings, client.threadPool().getThreadContext())) {
            // Injecting user role to verify if the user has permissions to write result to result index.
            injectSecurity.inject(user, roles);
            storeAnomalyResultAndRunNextPiece(
                adTask,
                pieceEndTime,
                dataStartTime,
                dataEndTime,
                interval,
                internalListener,
                anomalyResults,
                resultIndex,
                () -> injectSecurity.close()
            );
        } catch (Exception exception) {
            logger.error("Failed to inject user roles", exception);
            internalListener.onFailure(exception);
        }
    }